Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adding blog entry on E-commerce Funnel Analysis with StarRocks: 87 Million Records, Apache Hudi, Apache Iceberg, Delta Lake (MinIO, Apache HMS, Apache xTable) #360

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 103 additions & 0 deletions api/src/main/java/io/onetable/model/storage/DataFilesDiff.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.onetable.model.storage;

import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;

import lombok.Data;
import lombok.Singular;
import lombok.experimental.SuperBuilder;

/**
* Holds the collection of files that represent the difference between two states/commits/snapshots
* of a table with respect to the data files. Between any two states of a table, the newer/latest
* state may contain new files not present in the older state and may have removed files that were
* present in the older state. In most cases the data files included in the newer state are derived
* from a new commit in a source table format that has not been applied to a target table format
* yet. Hence, the collection of data files in the newer state are typically {@link OneDataFile}s,
* whereas the files in the older state are represented using a generic type P which can be a data
* file type in specific to the target table format.
*
* @param <L> the type of the files in the latest state
* @param <P> the type of the files in the target table format
*/
@Data
@SuperBuilder
public class DataFilesDiff<L, P> {
@Singular("fileAdded")
private Set<L> filesAdded;

@Singular("fileRemoved")
private Set<P> filesRemoved;

/**
* Compares the latest files with the previous files and identifies the files that are new, i.e.
* are present in latest files buy not present in the previously known files, and the files that
* are removed, i.e. present in the previously known files but not present in the latest files.
*
* @param latestFiles a map of file path and file object representing files in the latest snapshot
* of a table
* @param previousFiles a map of file path and file object representing files in a previously
* synced snapshot of a table.
* @param <P> the type of the previous files
* @return the diff of the files
*/
public static <L, P> DataFilesDiff<L, P> findNewAndRemovedFiles(
Map<String, L> latestFiles, Map<String, P> previousFiles) {
Set<L> newFiles = new HashSet<>();
Map<String, P> removedFiles = new HashMap<>(previousFiles);

// if a file in latest files is also present in previous files, then it is neither new nor
// removed.
latestFiles.forEach(
(key, value) -> {
boolean notAKnownFile = removedFiles.remove(key) == null;
if (notAKnownFile) {
newFiles.add(value);
}
});
return DataFilesDiff.<L, P>builder()
.filesAdded(newFiles)
.filesRemoved(removedFiles.values())
.build();
}

/**
* This method wraps the {@link #findNewAndRemovedFiles(Map, Map)} method, to compare the latest
* file groups with the previous files and identifies the files that are new, i.e. are present in
* latest files buy not present in the previously known files, and the files that are removed,
* i.e. present in the previously known files but not present in the latest files.
*
* @param latestFileGroups a list of file groups representing the latest snapshot of a table
* @param previousFiles a map of file path and file object representing files in a previously
* synced snapshot of a table
* @param <P> the type of the previous files
* @return the set of files that are added
*/
public static <P> DataFilesDiff<OneDataFile, P> findNewAndRemovedFiles(
List<OneFileGroup> latestFileGroups, Map<String, P> previousFiles) {
Map<String, OneDataFile> latestFiles =
latestFileGroups.stream()
.flatMap(group -> group.getFiles().stream())
.collect(Collectors.toMap(OneDataFile::getPhysicalPath, Function.identity()));
return findNewAndRemovedFiles(latestFiles, previousFiles);
}
}
38 changes: 13 additions & 25 deletions api/src/main/java/io/onetable/model/storage/OneDataFilesDiff.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,20 @@

package io.onetable.model.storage;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

import lombok.Builder;
import lombok.Singular;
import lombok.EqualsAndHashCode;
import lombok.Value;
import lombok.experimental.SuperBuilder;

/** Container for holding the list of files added and files removed between source and target. */
@Value
@Builder
public class OneDataFilesDiff {
@Singular("fileAdded")
Set<OneDataFile> filesAdded;

@Singular("fileRemoved")
Set<OneDataFile> filesRemoved;
@EqualsAndHashCode(callSuper = true)
@SuperBuilder
public class OneDataFilesDiff extends DataFilesDiff<OneDataFile, OneDataFile> {

/**
* Creates a OneDataFilesDiff from the list of files in the target table and the list of files in
Expand All @@ -52,19 +45,14 @@ public static OneDataFilesDiff from(List<OneDataFile> source, List<OneDataFile>
Map<String, OneDataFile> targetPaths =
target.stream()
.collect(Collectors.toMap(OneDataFile::getPhysicalPath, Function.identity()));
// Any files in the source that are not in the target are added
Set<OneDataFile> addedFiles =
Map<String, OneDataFile> sourcePaths =
source.stream()
.map(
file -> {
OneDataFile targetFileIfPresent = targetPaths.remove(file.getPhysicalPath());
return targetFileIfPresent == null ? file : null;
})
.filter(Objects::nonNull)
.collect(Collectors.toSet());
// Any files remaining in the targetPaths map are not present in the source and should be marked
// for removal
Set<OneDataFile> removedFiles = new HashSet<>(targetPaths.values());
return OneDataFilesDiff.builder().filesAdded(addedFiles).filesRemoved(removedFiles).build();
.collect(Collectors.toMap(OneDataFile::getPhysicalPath, Function.identity()));

DataFilesDiff<OneDataFile, OneDataFile> diff = findNewAndRemovedFiles(sourcePaths, targetPaths);
return OneDataFilesDiff.builder()
.filesAdded(diff.getFilesAdded())
.filesRemoved(diff.getFilesRemoved())
.build();
}
}
119 changes: 119 additions & 0 deletions api/src/test/java/io/onetable/model/storage/TestDataFilesDiff.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.onetable.model.storage;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;

import java.io.File;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.junit.jupiter.api.Test;

public class TestDataFilesDiff {
@Test
void findDiffFromFileGroups() {
OneDataFile file1Group1 = OneDataFile.builder().physicalPath("file1Group1").build();
OneDataFile file2Group1 = OneDataFile.builder().physicalPath("file2Group1").build();
OneDataFile file1Group2 = OneDataFile.builder().physicalPath("file1Group2").build();
OneDataFile file2Group2 = OneDataFile.builder().physicalPath("file2Group2").build();

List<OneFileGroup> latestFileGroups =
OneFileGroup.fromFiles(Arrays.asList(file1Group1, file2Group1, file1Group2, file2Group2));

Map<String, File> previousFiles = new HashMap<>();
File file1 = mock(File.class);
File file2 = mock(File.class);
File file3 = mock(File.class);
previousFiles.put("file1Group1", file1);
previousFiles.put("file2NoGroup", file2);
previousFiles.put("file2Group2", file3);

DataFilesDiff<OneDataFile, File> diff =
DataFilesDiff.findNewAndRemovedFiles(latestFileGroups, previousFiles);
assertEquals(2, diff.getFilesAdded().size());
assertTrue(diff.getFilesAdded().contains(file1Group2));
assertTrue(diff.getFilesAdded().contains(file2Group1));
assertEquals(1, diff.getFilesRemoved().size());
assertTrue(diff.getFilesRemoved().contains(file2));
}

@Test
void findDiffFromFilesNoPrevious() {
File file1 = mock(File.class);
File file2 = mock(File.class);

Map<String, File> previousFiles = new HashMap<>();
Map<String, File> latestFiles = new HashMap<>();
latestFiles.put("file1", file1);
latestFiles.put("file2", file2);

DataFilesDiff<File, File> diff =
DataFilesDiff.findNewAndRemovedFiles(latestFiles, previousFiles);
assertEquals(0, diff.getFilesRemoved().size());
assertEquals(2, diff.getFilesAdded().size());
assertTrue(diff.getFilesAdded().contains(file1));
assertTrue(diff.getFilesAdded().contains(file2));
}

@Test
void findDiffFromFilesNoNew() {
File file1 = mock(File.class);
File file2 = mock(File.class);

Map<String, File> previousFiles = new HashMap<>();
previousFiles.put("file1", file1);
previousFiles.put("file2", file2);

Map<String, File> latestFiles = new HashMap<>();
latestFiles.put("file1", file1);
latestFiles.put("file2", file2);

DataFilesDiff<File, File> diff =
DataFilesDiff.findNewAndRemovedFiles(latestFiles, previousFiles);
assertEquals(0, diff.getFilesRemoved().size());
assertEquals(0, diff.getFilesAdded().size());
}

@Test
void findDiffFromFiles() {
File file1 = mock(File.class);
File file2 = mock(File.class);
File file3 = mock(File.class);

Map<String, File> previousFiles = new HashMap<>();
previousFiles.put("file1", file1);
previousFiles.put("file2", file2);

Map<String, File> latestFiles = new HashMap<>();
latestFiles.put("file2", file2);
latestFiles.put("file3", file3);

DataFilesDiff<File, File> diff =
DataFilesDiff.findNewAndRemovedFiles(latestFiles, previousFiles);
assertEquals(1, diff.getFilesAdded().size());
assertTrue(diff.getFilesAdded().contains(file3));
assertEquals(1, diff.getFilesRemoved().size());
assertTrue(diff.getFilesRemoved().contains(file1));
}
}
12 changes: 8 additions & 4 deletions core/src/main/java/io/onetable/delta/DeltaClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import org.apache.hadoop.conf.Configuration;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.expressions.Literal;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

Expand Down Expand Up @@ -202,7 +203,8 @@ public void completeSync() {
@Override
public Optional<OneTableMetadata> getTableMetadata() {
return OneTableMetadata.fromMap(
JavaConverters.mapAsJavaMapConverter(deltaLog.metadata().configuration()).asJava());
JavaConverters.mapAsJavaMapConverter(deltaLog.snapshot().metadata().configuration())
.asJava());
}

@Override
Expand Down Expand Up @@ -256,8 +258,10 @@ private void commitTransaction() {
JavaConverters.asScalaBuffer(partitionColumns).toList(),
ScalaUtils.convertJavaMapToScala(getConfigurationsForDeltaSync()),
new Some<>(commitTime.toEpochMilli()));
transaction.updateMetadata(metadata);
transaction.commit(actions, new DeltaOperations.Update(Option.apply("onetable-delta-sync")));
transaction.updateMetadata(metadata, false);
transaction.commit(
actions,
new DeltaOperations.Update(Option.apply(Literal.fromObject("onetable-delta-sync"))));
}

private Map<String, String> getConfigurationsForDeltaSync() {
Expand Down Expand Up @@ -290,7 +294,7 @@ private Format getFileFormat() {
}
}
// fallback to existing deltalog value
return deltaLog.metadata().format();
return deltaLog.snapshot().metadata().format();
}
}
}
4 changes: 4 additions & 0 deletions core/src/main/java/io/onetable/delta/DeltaClientUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ static SparkSession buildSparkSession(Configuration conf) {
new SparkConf()
.setAppName("onetableclient")
.set("spark.serializer", KryoSerializer.class.getName())
.set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.set(
"spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog")
.set("spark.databricks.delta.constraints.allowUnenforcedNotNull.enabled", "true");
SparkSession.Builder builder = SparkSession.builder().config(sparkConf);
conf.forEach(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

import io.onetable.model.schema.OneSchema;
import io.onetable.model.stat.ColumnStat;
import io.onetable.model.storage.DataFilesDiff;
import io.onetable.model.storage.OneDataFile;
import io.onetable.model.storage.OneDataFilesDiff;
import io.onetable.model.storage.OneFileGroup;
Expand All @@ -63,27 +64,19 @@ public Seq<Action> applySnapshot(
// all files in the current delta snapshot are potential candidates for remove actions, i.e. if
// the file is not present in the new snapshot (addedFiles) then the file is considered removed
Snapshot snapshot = deltaLog.snapshot();
Map<String, Action> removedFiles =
Map<String, Action> previousFiles =
snapshot.allFiles().collectAsList().stream()
.map(AddFile::remove)
.collect(
Collectors.toMap(
file -> DeltaActionsConverter.getFullPathToFile(snapshot, file.path()),
file -> file));

Set<OneDataFile> addedFiles =
partitionedDataFiles.stream()
.flatMap(group -> group.getFiles().stream())
.map(
file -> {
Action targetFileIfPresent = removedFiles.remove(file.getPhysicalPath());
return targetFileIfPresent == null ? file : null;
})
.filter(Objects::nonNull)
.collect(Collectors.toSet());
DataFilesDiff<OneDataFile, Action> diff =
OneDataFilesDiff.findNewAndRemovedFiles(partitionedDataFiles, previousFiles);

return applyDiff(
addedFiles, removedFiles.values(), tableSchema, deltaLog.dataPath().toString());
diff.getFilesAdded(), diff.getFilesRemoved(), tableSchema, deltaLog.dataPath().toString());
}

public Seq<Action> applyDiff(
Expand Down Expand Up @@ -122,6 +115,7 @@ private Stream<AddFile> createAddFileAction(
dataFile.getLastModified(),
true,
getColumnStats(schema, dataFile.getRecordCount(), dataFile.getColumnStats()),
null,
null));
}

Expand Down
3 changes: 1 addition & 2 deletions core/src/main/java/io/onetable/delta/DeltaSourceClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,7 @@ public TableChange getTableChangeForCommit(Long versionNumber) {
OneTable tableAtVersion = tableExtractor.table(deltaLog, tableName, versionNumber);
// Client to call getCommitsBacklog and call this method.
List<Action> actionsForVersion = getChangesState().getActionsForVersion(versionNumber);
Snapshot snapshotAtVersion =
deltaLog.getSnapshotAt(versionNumber, Option.empty(), Option.empty());
Snapshot snapshotAtVersion = deltaLog.getSnapshotAt(versionNumber, Option.empty());
FileFormat fileFormat =
actionsConverter.convertToOneTableFileFormat(
snapshotAtVersion.metadata().format().provider());
Expand Down
Loading