Skip to content

Commit

Permalink
API, Core: Track partition statistics in TableMetadata (#8502)
Browse files Browse the repository at this point in the history
  • Loading branch information
ajantha-bhat authored Dec 18, 2023
1 parent ad3cf9d commit 6e21bbf
Show file tree
Hide file tree
Showing 25 changed files with 1,000 additions and 14 deletions.
36 changes: 36 additions & 0 deletions api/src/main/java/org/apache/iceberg/PartitionStatisticsFile.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;

/**
* Represents a partition statistics file that can be used to read table data more efficiently.
*
* <p>Statistics are informational. A reader can choose to ignore statistics information. Statistics
* support is not required to read the table correctly.
*/
public interface PartitionStatisticsFile {
/** ID of the Iceberg table's snapshot the partition statistics file is associated with. */
long snapshotId();

/** Returns fully qualified path to the file. Never null. */
String path();

/** Returns the size of the partition statistics file in bytes. */
long fileSizeInBytes();
}
17 changes: 17 additions & 0 deletions api/src/main/java/org/apache/iceberg/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;

/** Represents a table. */
public interface Table {
Expand Down Expand Up @@ -286,6 +287,17 @@ default UpdateStatistics updateStatistics() {
"Updating statistics is not supported by " + getClass().getName());
}

/**
* Create a new {@link UpdatePartitionStatistics update partition statistics API} to add or remove
* partition statistics files in this table.
*
* @return a new {@link UpdatePartitionStatistics}
*/
default UpdatePartitionStatistics updatePartitionStatistics() {
throw new UnsupportedOperationException(
"Updating partition statistics is not supported by " + getClass().getName());
}

/**
* Create a new {@link ExpireSnapshots expire API} to manage snapshots in this table and commit.
*
Expand Down Expand Up @@ -327,6 +339,11 @@ default UpdateStatistics updateStatistics() {
*/
List<StatisticsFile> statisticsFiles();

/** Returns the current partition statistics files for the table. */
default List<PartitionStatisticsFile> partitionStatisticsFiles() {
return ImmutableList.of();
}

/**
* Returns the current refs for the table
*
Expand Down
11 changes: 11 additions & 0 deletions api/src/main/java/org/apache/iceberg/Transaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,17 @@ default UpdateStatistics updateStatistics() {
"Updating statistics is not supported by " + getClass().getName());
}

/**
* Create a new {@link UpdatePartitionStatistics update partition statistics API} to add or remove
* partition statistics files in this table.
*
* @return a new {@link UpdatePartitionStatistics}
*/
default UpdatePartitionStatistics updatePartitionStatistics() {
throw new UnsupportedOperationException(
"Updating partition statistics is not supported by " + getClass().getName());
}

/**
* Create a new {@link ExpireSnapshots expire API} to manage snapshots in this table.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;

import java.util.List;

/** API for updating partition statistics files in a table. */
public interface UpdatePartitionStatistics extends PendingUpdate<List<PartitionStatisticsFile>> {
/**
* Set the table's partition statistics file for given snapshot, replacing the previous partition
* statistics file for the snapshot if any exists.
*
* @return this for method chaining
*/
UpdatePartitionStatistics setPartitionStatistics(PartitionStatisticsFile partitionStatisticsFile);

/**
* Remove the table's partition statistics file for given snapshot.
*
* @return this for method chaining
*/
UpdatePartitionStatistics removePartitionStatistics(long snapshotId);
}
5 changes: 5 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,11 @@ public List<StatisticsFile> statisticsFiles() {
return ImmutableList.of();
}

@Override
public List<PartitionStatisticsFile> partitionStatisticsFiles() {
return ImmutableList.of();
}

@Override
public Map<String, SnapshotRef> refs() {
return table().refs();
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseReadOnlyTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ public UpdateStatistics updateStatistics() {
"Cannot update statistics of a " + descriptor + " table");
}

@Override
public UpdatePartitionStatistics updatePartitionStatistics() {
throw new UnsupportedOperationException(
"Cannot update partition statistics of a " + descriptor + " table");
}

@Override
public ExpireSnapshots expireSnapshots() {
throw new UnsupportedOperationException(
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,11 @@ public UpdateStatistics updateStatistics() {
return new SetStatistics(ops);
}

@Override
public UpdatePartitionStatistics updatePartitionStatistics() {
return new SetPartitionStatistics(ops);
}

@Override
public ExpireSnapshots expireSnapshots() {
return new RemoveSnapshots(ops);
Expand Down Expand Up @@ -255,6 +260,11 @@ public List<StatisticsFile> statisticsFiles() {
return ops.current().statisticsFiles();
}

@Override
public List<PartitionStatisticsFile> partitionStatisticsFiles() {
return ops.current().partitionStatisticsFiles();
}

@Override
public Map<String, SnapshotRef> refs() {
return ops.current().refs();
Expand Down
19 changes: 19 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,15 @@ public UpdateStatistics updateStatistics() {
return updateStatistics;
}

@Override
public UpdatePartitionStatistics updatePartitionStatistics() {
checkLastOperationCommitted("UpdatePartitionStatistics");
UpdatePartitionStatistics updatePartitionStatistics =
new SetPartitionStatistics(transactionOps);
updates.add(updatePartitionStatistics);
return updatePartitionStatistics;
}

@Override
public ExpireSnapshots expireSnapshots() {
checkLastOperationCommitted("ExpireSnapshots");
Expand Down Expand Up @@ -733,6 +742,11 @@ public UpdateStatistics updateStatistics() {
return BaseTransaction.this.updateStatistics();
}

@Override
public UpdatePartitionStatistics updatePartitionStatistics() {
return BaseTransaction.this.updatePartitionStatistics();
}

@Override
public ExpireSnapshots expireSnapshots() {
return BaseTransaction.this.expireSnapshots();
Expand Down Expand Up @@ -769,6 +783,11 @@ public List<StatisticsFile> statisticsFiles() {
return current.statisticsFiles();
}

@Override
public List<PartitionStatisticsFile> partitionStatisticsFiles() {
return current.partitionStatisticsFiles();
}

@Override
public Map<String, SnapshotRef> refs() {
return current.refs();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ public UpdateStatistics updateStatistics() {
return wrapped.updateStatistics();
}

@Override
public UpdatePartitionStatistics updatePartitionStatistics() {
return wrapped.updatePartitionStatistics();
}

@Override
public ExpireSnapshots expireSnapshots() {
return wrapped.expireSnapshots();
Expand Down
19 changes: 14 additions & 5 deletions core/src/main/java/org/apache/iceberg/FileCleanupStrategy.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.io.CloseableIterable;
Expand Down Expand Up @@ -86,6 +85,11 @@ protected void deleteFiles(Set<String> pathsToDelete, String fileType) {
.run(deleteFunc::accept);
}

protected boolean hasAnyStatisticsFiles(TableMetadata tableMetadata) {
return !tableMetadata.statisticsFiles().isEmpty()
|| !tableMetadata.partitionStatisticsFiles().isEmpty();
}

protected Set<String> expiredStatisticsFilesLocations(
TableMetadata beforeExpiration, TableMetadata afterExpiration) {
Set<String> statsFileLocationsBeforeExpiration = statsFileLocations(beforeExpiration);
Expand All @@ -98,10 +102,15 @@ private Set<String> statsFileLocations(TableMetadata tableMetadata) {
Set<String> statsFileLocations = Sets.newHashSet();

if (tableMetadata.statisticsFiles() != null) {
statsFileLocations =
tableMetadata.statisticsFiles().stream()
.map(StatisticsFile::path)
.collect(Collectors.toSet());
tableMetadata.statisticsFiles().stream()
.map(StatisticsFile::path)
.forEach(statsFileLocations::add);
}

if (tableMetadata.partitionStatisticsFiles() != null) {
tableMetadata.partitionStatisticsFiles().stream()
.map(PartitionStatisticsFile::path)
.forEach(statsFileLocations::add);
}

return statsFileLocations;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;

import org.immutables.value.Value;

@Value.Immutable
public interface GenericPartitionStatisticsFile extends PartitionStatisticsFile {}
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira
deleteFiles(manifestsToDelete, "manifest");
deleteFiles(manifestListsToDelete, "manifest list");

if (!beforeExpiration.statisticsFiles().isEmpty()) {
if (hasAnyStatisticsFiles(beforeExpiration)) {
Set<String> expiredStatisticsFilesLocations =
expiredStatisticsFilesLocations(beforeExpiration, afterExpiration);
deleteFiles(expiredStatisticsFilesLocations, "statistics files");
Expand Down
38 changes: 38 additions & 0 deletions core/src/main/java/org/apache/iceberg/MetadataUpdate.java
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,44 @@ public void applyTo(TableMetadata.Builder metadataBuilder) {
}
}

class SetPartitionStatistics implements MetadataUpdate {
private final PartitionStatisticsFile partitionStatisticsFile;

public SetPartitionStatistics(PartitionStatisticsFile partitionStatisticsFile) {
this.partitionStatisticsFile = partitionStatisticsFile;
}

public long snapshotId() {
return partitionStatisticsFile.snapshotId();
}

public PartitionStatisticsFile partitionStatisticsFile() {
return partitionStatisticsFile;
}

@Override
public void applyTo(TableMetadata.Builder metadataBuilder) {
metadataBuilder.setPartitionStatistics(partitionStatisticsFile);
}
}

class RemovePartitionStatistics implements MetadataUpdate {
private final long snapshotId;

public RemovePartitionStatistics(long snapshotId) {
this.snapshotId = snapshotId;
}

public long snapshotId() {
return snapshotId;
}

@Override
public void applyTo(TableMetadata.Builder metadataBuilder) {
metadataBuilder.removePartitionStatistics(snapshotId);
}
}

class AddSnapshot implements MetadataUpdate {
private final Snapshot snapshot;

Expand Down
Loading

0 comments on commit 6e21bbf

Please sign in to comment.