Skip to content

Commit

Permalink
HIVE-28536: Iceberg: automatically assign Iceberg tables for compacti…
Browse files Browse the repository at this point in the history
…on on a custom compaction pool.
  • Loading branch information
Dmitriy Fingerman committed Oct 11, 2024
1 parent b90fe82 commit 447150a
Show file tree
Hide file tree
Showing 28 changed files with 258 additions and 52 deletions.
4 changes: 3 additions & 1 deletion common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -2234,7 +2234,9 @@ public static enum ConfVars {
"If this is set to true the URI for auth will have the default location masked with DEFAULT_TABLE_LOCATION"),
HIVE_ICEBERG_ALLOW_DATAFILES_IN_TABLE_LOCATION_ONLY("hive.iceberg.allow.datafiles.in.table.location.only", false,
"If this is set to true, then all the data files being read should be withing the table location"),

HIVE_ICEBERG_COMPACTOR_POOL_NAME("hive.iceberg.compactor.pool.name", "default",
"Compaction pool name for Iceberg tables. When the pool name is not default, it is also necessary " +
"to define a corresponding config in a form of 'hive.compactor.worker.{poolName}.threads'"),
HIVE_USE_EXPLICIT_RCFILE_HEADER("hive.exec.rcfile.use.explicit.header", true,
"If this is set the header for RCFiles will simply be RCF. If this is not\n" +
"set the header will be that borrowed from sequence files, e.g. SEQ- followed\n" +
Expand Down
5 changes: 5 additions & 0 deletions data/conf/iceberg/llap/hive-site.xml
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,11 @@
<property>
<name>hive.txn.xlock.ctas</name>
<value>false</value>
</property>

<property>
<name>hive.iceberg.compactor.pool.name</name>
<value>iceberg</value>
</property>

</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,6 @@ PREHOOK: type: SHOW COMPACTIONS
POSTHOOK: query: show compactions order by 'partition'
POSTHOOK: type: SHOW COMPACTIONS
CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name TxnId Next TxnId Commit Time Highest WriteId
#Masked# default ice_orc company_id=100/dept_id=1 MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc company_id=100/dept_id=2 MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc --- MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc company_id=100/dept_id=1 MAJOR succeeded #Masked# manual iceberg 0 0 0 ---
#Masked# default ice_orc company_id=100/dept_id=2 MAJOR succeeded #Masked# manual iceberg 0 0 0 ---
#Masked# default ice_orc --- MAJOR succeeded #Masked# manual iceberg 0 0 0 ---
Original file line number Diff line number Diff line change
Expand Up @@ -259,5 +259,5 @@ PREHOOK: type: SHOW COMPACTIONS
POSTHOOK: query: show compactions order by 'partition'
POSTHOOK: type: SHOW COMPACTIONS
CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name TxnId Next TxnId Commit Time Highest WriteId
#Masked# default ice_orc dept_id=2 MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc --- MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc dept_id=2 MAJOR succeeded #Masked# manual iceberg 0 0 0 ---
#Masked# default ice_orc --- MAJOR succeeded #Masked# manual iceberg 0 0 0 ---
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,6 @@ PREHOOK: type: SHOW COMPACTIONS
POSTHOOK: query: show compactions order by 'partition'
POSTHOOK: type: SHOW COMPACTIONS
CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name TxnId Next TxnId Commit Time Highest WriteId
#Masked# default ice_orc dept_id=2 MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc dept_id=3 MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc --- MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc dept_id=2 MAJOR succeeded #Masked# manual iceberg 0 0 0 ---
#Masked# default ice_orc dept_id=3 MAJOR succeeded #Masked# manual iceberg 0 0 0 ---
#Masked# default ice_orc --- MAJOR succeeded #Masked# manual iceberg 0 0 0 ---
Original file line number Diff line number Diff line change
Expand Up @@ -380,8 +380,8 @@ PREHOOK: type: SHOW COMPACTIONS
POSTHOOK: query: show compactions order by 'partition'
POSTHOOK: type: SHOW COMPACTIONS
CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name TxnId Next TxnId Commit Time Highest WriteId
#Masked# default ice_orc event_src_trunc=AAA/event_time_month=2024-08 MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc event_src_trunc=AAA/event_time_month=2024-09 MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc event_src_trunc=BBB/event_time_month=2024-07 MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc event_src_trunc=BBB/event_time_month=2024-08 MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc --- MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc event_src_trunc=AAA/event_time_month=2024-08 MAJOR succeeded #Masked# manual iceberg 0 0 0 ---
#Masked# default ice_orc event_src_trunc=AAA/event_time_month=2024-09 MAJOR succeeded #Masked# manual iceberg 0 0 0 ---
#Masked# default ice_orc event_src_trunc=BBB/event_time_month=2024-07 MAJOR succeeded #Masked# manual iceberg 0 0 0 ---
#Masked# default ice_orc event_src_trunc=BBB/event_time_month=2024-08 MAJOR succeeded #Masked# manual iceberg 0 0 0 ---
#Masked# default ice_orc --- MAJOR succeeded #Masked# manual iceberg 0 0 0 ---
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ PREHOOK: type: SHOW COMPACTIONS
POSTHOOK: query: show compactions order by 'partition'
POSTHOOK: type: SHOW COMPACTIONS
CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name TxnId Next TxnId Commit Time Highest WriteId
#Masked# default ice_orc company_id=100/dept_id=1 MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc company_id=100/dept_id=2 MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc company_id=100/dept_id=4 MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc --- MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc company_id=100/dept_id=1 MAJOR succeeded #Masked# manual iceberg 0 0 0 ---
#Masked# default ice_orc company_id=100/dept_id=2 MAJOR succeeded #Masked# manual iceberg 0 0 0 ---
#Masked# default ice_orc company_id=100/dept_id=4 MAJOR succeeded #Masked# manual iceberg 0 0 0 ---
#Masked# default ice_orc --- MAJOR succeeded #Masked# manual iceberg 0 0 0 ---
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,8 @@ PREHOOK: type: SHOW COMPACTIONS
POSTHOOK: query: show compactions order by 'partition'
POSTHOOK: type: SHOW COMPACTIONS
CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name TxnId Next TxnId Commit Time Highest WriteId
#Masked# default ice_orc dept_id=1 MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc dept_id=2 MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc dept_id=1 MAJOR succeeded #Masked# manual iceberg 0 0 0 ---
#Masked# default ice_orc dept_id=2 MAJOR succeeded #Masked# manual iceberg 0 0 0 ---
PREHOOK: query: insert into ice_orc VALUES ('fn11','ln11', 1)
PREHOOK: type: QUERY
PREHOOK: Input: _dummy_database@_dummy_table
Expand Down Expand Up @@ -652,7 +652,7 @@ PREHOOK: type: SHOW COMPACTIONS
POSTHOOK: query: show compactions order by 'partition'
POSTHOOK: type: SHOW COMPACTIONS
CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name TxnId Next TxnId Commit Time Highest WriteId
#Masked# default ice_orc dept_id=1 MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc dept_id=1 MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc dept_id=2 MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc dept_id=2 MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc dept_id=1 MAJOR succeeded #Masked# manual iceberg 0 0 0 ---
#Masked# default ice_orc dept_id=1 MAJOR succeeded #Masked# manual iceberg 0 0 0 ---
#Masked# default ice_orc dept_id=2 MAJOR succeeded #Masked# manual iceberg 0 0 0 ---
#Masked# default ice_orc dept_id=2 MAJOR succeeded #Masked# manual iceberg 0 0 0 ---
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ PREHOOK: type: SHOW COMPACTIONS
POSTHOOK: query: show compactions
POSTHOOK: type: SHOW COMPACTIONS
CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name TxnId Next TxnId Commit Time Highest WriteId
1 default x --- MAJOR succeeded #Masked# manual default 0 0 0 ---
1 default x --- MAJOR succeeded #Masked# manual iceberg 0 0 0 ---
PREHOOK: query: desc formatted x
PREHOOK: type: DESCTABLE
PREHOOK: Input: default@x
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,5 +359,5 @@ PREHOOK: type: SHOW COMPACTIONS
POSTHOOK: query: show compactions order by 'partition'
POSTHOOK: type: SHOW COMPACTIONS
CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name TxnId Next TxnId Commit Time Highest WriteId
#Masked# default ice_orc dept_id=1 MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc dept_id=2 MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc dept_id=1 MAJOR succeeded #Masked# manual iceberg 0 0 0 ---
#Masked# default ice_orc dept_id=2 MAJOR succeeded #Masked# manual iceberg 0 0 0 ---
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ PREHOOK: type: SHOW COMPACTIONS
POSTHOOK: query: show compactions order by 'partition'
POSTHOOK: type: SHOW COMPACTIONS
CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name TxnId Next TxnId Commit Time Highest WriteId
#Masked# default ice_orc_wo_evo dept_id=1/city=London/registration_date=2024-03-11 MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc_wo_evo dept_id=1/city=London/registration_date=2024-03-11 MAJOR succeeded #Masked# manual iceberg 0 0 0 ---
PREHOOK: query: explain alter table ice_orc_wo_evo PARTITION (dept_id=2, city='Paris', registration_date='2024-02-16') COMPACT 'major' and wait
PREHOOK: type: ALTERTABLE_COMPACT
PREHOOK: Input: default@ice_orc_wo_evo
Expand Down Expand Up @@ -447,5 +447,5 @@ PREHOOK: type: SHOW COMPACTIONS
POSTHOOK: query: show compactions order by 'partition'
POSTHOOK: type: SHOW COMPACTIONS
CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name TxnId Next TxnId Commit Time Highest WriteId
#Masked# default ice_orc_wo_evo dept_id=1/city=London/registration_date=2024-03-11 MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc_wo_evo dept_id=2/city=Paris/registration_date=2024-02-16 MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc_wo_evo dept_id=1/city=London/registration_date=2024-03-11 MAJOR succeeded #Masked# manual iceberg 0 0 0 ---
#Masked# default ice_orc_wo_evo dept_id=2/city=Paris/registration_date=2024-02-16 MAJOR succeeded #Masked# manual iceberg 0 0 0 ---
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,7 @@ PREHOOK: type: SHOW COMPACTIONS
POSTHOOK: query: show compactions order by 'partition'
POSTHOOK: type: SHOW COMPACTIONS
CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name TxnId Next TxnId Commit Time Highest WriteId
#Masked# default ice_orc dept_id=1/city=London MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc dept_id=1/city=London/registration_date=2024-03-11 MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc dept_id=2/city=Paris MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc dept_id=2/city=Paris/registration_date=2024-02-16 MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc dept_id=1/city=London MAJOR succeeded #Masked# manual iceberg 0 0 0 ---
#Masked# default ice_orc dept_id=1/city=London/registration_date=2024-03-11 MAJOR succeeded #Masked# manual iceberg 0 0 0 ---
#Masked# default ice_orc dept_id=2/city=Paris MAJOR succeeded #Masked# manual iceberg 0 0 0 ---
#Masked# default ice_orc dept_id=2/city=Paris/registration_date=2024-02-16 MAJOR succeeded #Masked# manual iceberg 0 0 0 ---
Original file line number Diff line number Diff line change
Expand Up @@ -448,5 +448,5 @@ PREHOOK: type: SHOW COMPACTIONS
POSTHOOK: query: show compactions order by 'partition'
POSTHOOK: type: SHOW COMPACTIONS
CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name TxnId Next TxnId Commit Time Highest WriteId
#Masked# default ice_orc a=a MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc b=1 MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc a=a MAJOR succeeded #Masked# manual iceberg 0 0 0 ---
#Masked# default ice_orc b=1 MAJOR succeeded #Masked# manual iceberg 0 0 0 ---
Original file line number Diff line number Diff line change
Expand Up @@ -327,4 +327,4 @@ PREHOOK: type: SHOW COMPACTIONS
POSTHOOK: query: show compactions
POSTHOOK: type: SHOW COMPACTIONS
CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name TxnId Next TxnId Commit Time Highest WriteId
#Masked# default ice_orc --- MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc --- MAJOR succeeded #Masked# manual iceberg 0 0 0 ---
Original file line number Diff line number Diff line change
Expand Up @@ -225,4 +225,4 @@ PREHOOK: type: SHOW COMPACTIONS
POSTHOOK: query: show compactions
POSTHOOK: type: SHOW COMPACTIONS
CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name TxnId Next TxnId Commit Time Highest WriteId
#Masked# default ice_orc --- MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc --- MAJOR succeeded #Masked# manual iceberg 0 0 0 ---
Original file line number Diff line number Diff line change
Expand Up @@ -307,4 +307,4 @@ PREHOOK: type: SHOW COMPACTIONS
POSTHOOK: query: show compactions
POSTHOOK: type: SHOW COMPACTIONS
CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name TxnId Next TxnId Commit Time Highest WriteId
#Masked# default ice_orc --- MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc --- MAJOR succeeded #Masked# manual iceberg 0 0 0 ---
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class TestIcebergLlapLocalCompactorCliDriver {

static CliAdapter adapter = new CliConfigs.IcebergLlapLocalCompactorCliConfig().getCliAdapter();
private static final AtomicBoolean stop = new AtomicBoolean();
private static final String icebergCompactionPoolName = "iceberg";
private static Worker worker;

@Parameters(name ="{0}")
Expand All @@ -52,6 +53,7 @@ public static List<Object[]> getParameters() throws Exception {
public static void setup() throws Exception {
worker = new Worker();
worker.setConf(SessionState.get().getConf());
worker.setPoolName(icebergCompactionPoolName);
stop.set(false);
worker.init(stop);
worker.start();
Expand All @@ -78,6 +80,7 @@ public TestIcebergLlapLocalCompactorCliDriver(String name, File qfile) {

@Test
public void testCliDriver() throws Exception {
adapter.setUp();
adapter.runTest(name, qfile);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hadoop.hive.ql.ddl.table.storage.compact;

import org.apache.hadoop.hive.common.ServerUtils;
import org.apache.hadoop.hive.conf.Constants;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;

Expand All @@ -27,6 +28,7 @@
import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.metastore.utils.JavaUtils;
Expand Down Expand Up @@ -88,7 +90,8 @@ else if (desc.getPartitionSpec() != null) {
CompactionRequest compactionRequest = new CompactionRequest(table.getDbName(), table.getTableName(),
compactionTypeStr2ThriftType(desc.getCompactionType()));

compactionRequest.setPoolName(desc.getPoolName());
compactionRequest.setPoolName(desc.getPoolName() != null ? desc.getPoolName() :
DDLUtils.isIcebergTable(table) ? CompactorUtil.getIcebergCompactPoolName(context.getConf()) : null);
compactionRequest.setProperties(desc.getProperties());
compactionRequest.setInitiatorId(JavaUtils.hostname() + "-" + HiveMetaStoreClient.MANUALLY_INITIATED_COMPACTION);
compactionRequest.setInitiatorVersion(HiveMetaStoreClient.class.getPackage().getImplementationVersion());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -565,4 +565,12 @@ public static Map<String, Integer> getPoolConf(HiveConf hiveConf) {
}
return poolConf;
}

public static String getIcebergCompactPoolName(HiveConf conf) {
String icebergCompactionPool = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_ICEBERG_COMPACTOR_POOL_NAME);
if (icebergCompactionPool != null && icebergCompactionPool.equals(Constants.COMPACTION_DEFAULT_POOL)) {
icebergCompactionPool = null;
}
return icebergCompactionPool;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.conf.Constants;
import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils;
import org.apache.hadoop.hive.metastore.MetaStoreThread;
import org.apache.hadoop.hive.metastore.api.FindNextCompactRequest;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
import org.apache.hadoop.hive.ql.ddl.DDLUtils;
import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.txn.compactor.service.CompactionService;
import org.apache.hadoop.hive.ql.txn.compactor.service.CompactionExecutorFactory;
Expand Down Expand Up @@ -192,11 +194,11 @@ protected Boolean findNextCompactionAndExecute(boolean collectGenericStats, bool
}

try {

FindNextCompactRequest findNextCompactRequest = new FindNextCompactRequest();
findNextCompactRequest.setWorkerId(workerName);
findNextCompactRequest.setWorkerVersion(runtimeVersion);
findNextCompactRequest.setPoolName(this.getPoolName());
findNextCompactRequest.setIcebergPoolName(CompactorUtil.getIcebergCompactPoolName(conf));
ci = CompactionInfo.optionalCompactionInfoStructToInfo(msc.findNextCompact(findNextCompactRequest));
LOG.info("Processing compaction request {}", ci);

Expand Down
Loading

0 comments on commit 447150a

Please sign in to comment.