Skip to content

Commit

Permalink
updates
Browse files Browse the repository at this point in the history
  • Loading branch information
nastra committed Sep 27, 2024
1 parent 167fe3b commit 07460b6
Show file tree
Hide file tree
Showing 5 changed files with 343 additions and 82 deletions.
66 changes: 55 additions & 11 deletions api/src/main/java/org/apache/iceberg/util/DataFileSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,34 @@
*/
package org.apache.iceberg.util;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Comparators;

public class DataFileSet extends WrapperSet<DataFile> {
public class DataFileSet extends WrapperSet<DataFile> implements Serializable {
private static final ThreadLocal<DataFileWrapper> WRAPPERS =
ThreadLocal.withInitial(() -> DataFileWrapper.wrap(null));

protected DataFileSet(Iterable<Wrapper<DataFile>> wrappers) {
private DataFileSet() {
// needed for serialization/deserialization
}

private DataFileSet(Iterable<Wrapper<DataFile>> wrappers) {
super(wrappers);
}

public static DataFileSet empty() {
public static DataFileSet create() {
return new DataFileSet(Sets.newLinkedHashSet());
}

public static DataFileSet of(Iterable<DataFile> iterable) {
return new DataFileSet(
Sets.newLinkedHashSet(Iterables.transform(iterable, DataFileWrapper::wrap)));
public static DataFileSet of(Iterable<? extends DataFile> iterable) {
return new DataFileSet(Iterables.transform(iterable, DataFileWrapper::wrap));
}

@Override
Expand All @@ -52,11 +59,48 @@ protected Wrapper<DataFile> wrap(DataFile dataFile) {
}

@Override
protected boolean isInstance(Object obj) {
return obj instanceof DataFile;
protected Class<DataFile> elementClass() {
return DataFile.class;
}

/**
* Since {@link WrapperSet} itself isn't {@link Serializable}, this requires custom logic to write
* {@link DataFileSet} to the given stream.
*
* @param out The output stream to write to
* @throws IOException in case the object can't be written
*/
private void writeObject(ObjectOutputStream out) throws IOException {
out.writeInt(set().size());
for (Wrapper<DataFile> wrapper : set()) {
out.writeObject(wrapper);
}
}

/**
* Since {@link WrapperSet} itself isn't {@link Serializable}, this requires custom logic to read
* {@link DataFileSet} from the given stream.
*
* @param in The input stream to read from
* @throws IOException in case the object can't be read
* @throws ClassNotFoundException in case the class of the serialized object can't be found
*/
@SuppressWarnings("unchecked")
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
int size = in.readInt();
for (int i = 0; i < size; i++) {
set().add((Wrapper<DataFile>) in.readObject());
}
}

@Override
public String toString() {
return set().stream()
.map(Object::toString)
.collect(Collectors.joining("DataFileSet({", ", ", "})"));
}

private static class DataFileWrapper implements Wrapper<DataFile> {
private static class DataFileWrapper implements Wrapper<DataFile>, Serializable {
private DataFile file;

private DataFileWrapper(DataFile file) {
Expand Down Expand Up @@ -97,7 +141,7 @@ public boolean equals(Object o) {
return false;
}

return 0 == Comparators.charSequences().compare(file.location(), that.file.location());
return Objects.equals(file.location(), that.file.location());
}

@Override
Expand Down
66 changes: 55 additions & 11 deletions api/src/main/java/org/apache/iceberg/util/DeleteFileSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,34 @@
*/
package org.apache.iceberg.util;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Comparators;

public class DeleteFileSet extends WrapperSet<DeleteFile> {
public class DeleteFileSet extends WrapperSet<DeleteFile> implements Serializable {
private static final ThreadLocal<DeleteFileWrapper> WRAPPERS =
ThreadLocal.withInitial(() -> DeleteFileWrapper.wrap(null));

protected DeleteFileSet(Iterable<Wrapper<DeleteFile>> wrappers) {
private DeleteFileSet() {
// needed for serialization/deserialization
}

private DeleteFileSet(Iterable<Wrapper<DeleteFile>> wrappers) {
super(wrappers);
}

public static DeleteFileSet empty() {
public static DeleteFileSet create() {
return new DeleteFileSet(Sets.newLinkedHashSet());
}

public static DeleteFileSet of(Iterable<DeleteFile> iterable) {
return new DeleteFileSet(
Sets.newLinkedHashSet(Iterables.transform(iterable, DeleteFileWrapper::wrap)));
public static DeleteFileSet of(Iterable<? extends DeleteFile> iterable) {
return new DeleteFileSet(Iterables.transform(iterable, DeleteFileWrapper::wrap));
}

@Override
Expand All @@ -52,11 +59,48 @@ protected Wrapper<DeleteFile> wrap(DeleteFile deleteFile) {
}

@Override
protected boolean isInstance(Object obj) {
return obj instanceof DeleteFile;
protected Class<DeleteFile> elementClass() {
return DeleteFile.class;
}

/**
* Since {@link WrapperSet} itself isn't {@link Serializable}, this requires custom logic to write
* {@link DeleteFileSet} to the given stream.
*
* @param out The output stream to write to
* @throws IOException in case the object can't be written
*/
private void writeObject(ObjectOutputStream out) throws IOException {
out.writeInt(set().size());
for (Wrapper<DeleteFile> wrapper : set()) {
out.writeObject(wrapper);
}
}

/**
* Since {@link WrapperSet} itself isn't {@link Serializable}, this requires custom logic to read
* {@link DeleteFileSet} from the given stream.
*
* @param in The input stream to read from
* @throws IOException in case the object can't be read
* @throws ClassNotFoundException in case the class of the serialized object can't be found
*/
@SuppressWarnings("unchecked")
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
int size = in.readInt();
for (int i = 0; i < size; i++) {
set().add((Wrapper<DeleteFile>) in.readObject());
}
}

@Override
public String toString() {
return set().stream()
.map(Object::toString)
.collect(Collectors.joining("DeleteFileSet({", ", ", "})"));
}

private static class DeleteFileWrapper implements Wrapper<DeleteFile> {
private static class DeleteFileWrapper implements Wrapper<DeleteFile>, Serializable {
private DeleteFile file;

private DeleteFileWrapper(DeleteFile file) {
Expand Down Expand Up @@ -98,7 +142,7 @@ public boolean equals(Object o) {
}

// this needs to be updated once deletion vector support is added
return 0 == Comparators.charSequences().compare(file.location(), that.file.location());
return Objects.equals(file.location(), that.file.location());
}

@Override
Expand Down
65 changes: 31 additions & 34 deletions api/src/main/java/org/apache/iceberg/util/WrapperSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,40 +18,46 @@
*/
package org.apache.iceberg.util;

import java.io.Serializable;
import java.util.Collection;
import java.util.Iterator;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Iterators;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;

/**
* A custom set for a {@link Wrapper} of the given type that maintains insertion order
* A custom set for a {@link Wrapper} of the given type that maintains insertion order.
*
* @param <T> The type to wrap in a {@link Wrapper} instance.
*/
abstract class WrapperSet<T> implements Set<T>, Serializable {
private final Set<Wrapper<T>> set;
abstract class WrapperSet<T> implements Set<T> {
private final Set<Wrapper<T>> set = Sets.newLinkedHashSet();

protected WrapperSet(Iterable<Wrapper<T>> wrappers) {
this.set = Sets.newLinkedHashSet(wrappers);
wrappers.forEach(set::add);
}

protected WrapperSet() {}

protected abstract Wrapper<T> wrapper();

protected abstract Wrapper<T> wrap(T file);

protected abstract boolean isInstance(Object obj);
protected abstract Class<T> elementClass();

protected interface Wrapper<T> {
T get();

Wrapper<T> set(T object);
}

protected Set<Wrapper<T>> set() {
return set;
}

@Override
public int size() {
return set.size();
Expand All @@ -65,7 +71,8 @@ public boolean isEmpty() {
@SuppressWarnings("unchecked")
@Override
public boolean contains(Object obj) {
if (isInstance(obj)) {
Preconditions.checkNotNull(obj, "Invalid object: null");
if (elementClass().isInstance(obj)) {
Wrapper<T> wrapper = wrapper();
boolean result = set.contains(wrapper.set((T) obj));
wrapper.set(null); // don't hold a reference to the value
Expand All @@ -92,13 +99,15 @@ public <X> X[] toArray(X[] destArray) {

@Override
public boolean add(T obj) {
Preconditions.checkNotNull(obj, "Invalid object: null");
return set.add(wrap(obj));
}

@SuppressWarnings("unchecked")
@Override
public boolean remove(Object obj) {
if (isInstance(obj)) {
Preconditions.checkNotNull(obj, "Invalid object: null");
if (elementClass().isInstance(obj)) {
Wrapper<T> wrapper = wrapper();
boolean result = set.remove(wrapper.set((T) obj));
wrapper.set(null); // don't hold a reference to the value
Expand All @@ -110,46 +119,34 @@ public boolean remove(Object obj) {

@Override
public boolean containsAll(Collection<?> collection) {
if (null != collection) {
return Iterables.all(collection, this::contains);
}

return false;
Preconditions.checkNotNull(collection, "Invalid collection: null");
return Iterables.all(collection, this::contains);
}

@Override
public boolean addAll(Collection<? extends T> collection) {
if (null != collection) {
return Iterables.addAll(set, Iterables.transform(collection, this::wrap));
}

return false;
Preconditions.checkNotNull(collection, "Invalid collection: null");
return Iterables.addAll(set, Iterables.transform(collection, this::wrap));
}

@SuppressWarnings("unchecked")
@Override
public boolean retainAll(Collection<?> collection) {
if (null != collection) {
Set<Wrapper<T>> toRetain =
collection.stream()
.filter(this::isInstance)
.map(obj -> (T) obj)
.map(this::wrap)
.collect(Collectors.toSet());

return Iterables.retainAll(set, toRetain);
}
Preconditions.checkNotNull(collection, "Invalid collection: null");
Set<Wrapper<T>> toRetain =
collection.stream()
.filter(elementClass()::isInstance)
.map(obj -> (T) obj)
.map(this::wrap)
.collect(Collectors.toSet());

return false;
return Iterables.retainAll(set, toRetain);
}

@Override
public boolean removeAll(Collection<?> collection) {
if (null != collection) {
return collection.stream().filter(this::remove).count() != 0;
}

return false;
Preconditions.checkNotNull(collection, "Invalid collection: null");
return collection.stream().filter(this::remove).count() != 0;
}

@Override
Expand Down
Loading

0 comments on commit 07460b6

Please sign in to comment.