Skip to content

Commit

Permalink
HIVE-28536: Iceberg: automatically assign Iceberg tables for compacti…
Browse files Browse the repository at this point in the history
…on on a custom compaction pool.
  • Loading branch information
Dmitriy Fingerman committed Oct 16, 2024
1 parent b90fe82 commit 2110fc7
Show file tree
Hide file tree
Showing 13 changed files with 92 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ create table ice_orc (
)
partitioned by (company_id bigint)
stored by iceberg stored as orc
tblproperties ('format-version'='2');
tblproperties ('format-version'='2', 'hive.compactor.worker.pool'='iceberg');

insert into ice_orc VALUES ('fn1','ln1', 1, 10, 100);
insert into ice_orc VALUES ('fn2','ln2', 1, 10, 100);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ delete from ice_orc where last_name in ('ln1a', 'ln2a', 'ln7a');
select * from ice_orc;
describe formatted ice_orc;

explain alter table ice_orc COMPACT 'major' and wait;
alter table ice_orc COMPACT 'major' and wait;
explain alter table ice_orc COMPACT 'major' and wait pool 'iceberg';
alter table ice_orc COMPACT 'major' and wait pool 'iceberg';

select * from ice_orc;
describe formatted ice_orc;
Expand Down Expand Up @@ -85,8 +85,8 @@ delete from ice_orc where last_name in ('ln11a', 'ln12a', 'ln17a', 'ln18a');
select * from ice_orc;
describe formatted ice_orc;

explain alter table ice_orc COMPACT 'major' and wait;
alter table ice_orc COMPACT 'major' and wait;
explain alter table ice_orc COMPACT 'major' and wait pool 'iceberg';
alter table ice_orc COMPACT 'major' and wait pool 'iceberg';

select * from ice_orc;
describe formatted ice_orc;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ delete from ice_orc where last_name in ('ln5a', 'ln6a', 'ln7a');
select * from ice_orc;
describe formatted ice_orc;

explain alter table ice_orc COMPACT 'major' and wait;
explain optimize table ice_orc rewrite data;
explain alter table ice_orc COMPACT 'major' and wait pool 'iceberg';
explain optimize table ice_orc rewrite data pool 'iceberg';

alter table ice_orc COMPACT 'major' and wait;
alter table ice_orc COMPACT 'major' and wait pool 'iceberg';

select * from ice_orc;
describe formatted ice_orc;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ create table ice_orc (
last_name string
)
stored by iceberg stored as orc
tblproperties ('format-version'='2');
tblproperties ('format-version'='2', 'hive.compactor.worker.pool'='iceberg');

insert into ice_orc VALUES
('fn1','ln1'),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ PREHOOK: query: create table ice_orc (
)
partitioned by (company_id bigint)
stored by iceberg stored as orc
tblproperties ('format-version'='2')
tblproperties ('format-version'='2', 'hive.compactor.worker.pool'='iceberg')
PREHOOK: type: CREATETABLE
PREHOOK: Output: database:default
PREHOOK: Output: default@ice_orc
Expand All @@ -18,7 +18,7 @@ POSTHOOK: query: create table ice_orc (
)
partitioned by (company_id bigint)
stored by iceberg stored as orc
tblproperties ('format-version'='2')
tblproperties ('format-version'='2', 'hive.compactor.worker.pool'='iceberg')
POSTHOOK: type: CREATETABLE
POSTHOOK: Output: database:default
POSTHOOK: Output: default@ice_orc
Expand Down Expand Up @@ -243,6 +243,7 @@ Table Parameters:
current-snapshot-timestamp-ms #Masked#
default-partition-spec {\"spec-id\":1,\"fields\":[{\"name\":\"company_id\",\"transform\":\"identity\",\"source-id\":5,\"field-id\":1000},{\"name\":\"dept_id\",\"transform\":\"identity\",\"source-id\":3,\"field-id\":1001}]}
format-version 2
hive.compactor.worker.pool iceberg
iceberg.orc.files.only true
#### A masked pattern was here ####
numFiles 14
Expand Down Expand Up @@ -344,6 +345,7 @@ Table Parameters:
current-snapshot-timestamp-ms #Masked#
default-partition-spec {\"spec-id\":1,\"fields\":[{\"name\":\"company_id\",\"transform\":\"identity\",\"source-id\":5,\"field-id\":1000},{\"name\":\"dept_id\",\"transform\":\"identity\",\"source-id\":3,\"field-id\":1001}]}
format-version 2
hive.compactor.worker.pool iceberg
iceberg.orc.files.only true
#### A masked pattern was here ####
numFiles 2
Expand Down Expand Up @@ -374,6 +376,6 @@ PREHOOK: type: SHOW COMPACTIONS
POSTHOOK: query: show compactions order by 'partition'
POSTHOOK: type: SHOW COMPACTIONS
CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name TxnId Next TxnId Commit Time Highest WriteId
#Masked# default ice_orc company_id=100/dept_id=1 MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc company_id=100/dept_id=2 MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc --- MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc company_id=100/dept_id=1 MAJOR succeeded #Masked# manual iceberg 0 0 0 ---
#Masked# default ice_orc company_id=100/dept_id=2 MAJOR succeeded #Masked# manual iceberg 0 0 0 ---
#Masked# default ice_orc --- MAJOR succeeded #Masked# manual iceberg 0 0 0 ---
Original file line number Diff line number Diff line change
Expand Up @@ -220,11 +220,11 @@ InputFormat: org.apache.iceberg.mr.hive.HiveIcebergInputFormat
OutputFormat: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat
Compressed: No
Sort Columns: []
PREHOOK: query: explain alter table ice_orc COMPACT 'major' and wait
PREHOOK: query: explain alter table ice_orc COMPACT 'major' and wait pool 'iceberg'
PREHOOK: type: ALTERTABLE_COMPACT
PREHOOK: Input: default@ice_orc
PREHOOK: Output: default@ice_orc
POSTHOOK: query: explain alter table ice_orc COMPACT 'major' and wait
POSTHOOK: query: explain alter table ice_orc COMPACT 'major' and wait pool 'iceberg'
POSTHOOK: type: ALTERTABLE_COMPACT
POSTHOOK: Input: default@ice_orc
POSTHOOK: Output: default@ice_orc
Expand All @@ -237,14 +237,15 @@ STAGE PLANS:
compaction type: major
table name: default.ice_orc
numberOfBuckets: 0
pool: iceberg
table name: default.ice_orc
blocking: true

PREHOOK: query: alter table ice_orc COMPACT 'major' and wait
PREHOOK: query: alter table ice_orc COMPACT 'major' and wait pool 'iceberg'
PREHOOK: type: ALTERTABLE_COMPACT
PREHOOK: Input: default@ice_orc
PREHOOK: Output: default@ice_orc
POSTHOOK: query: alter table ice_orc COMPACT 'major' and wait
POSTHOOK: query: alter table ice_orc COMPACT 'major' and wait pool 'iceberg'
POSTHOOK: type: ALTERTABLE_COMPACT
POSTHOOK: Input: default@ice_orc
POSTHOOK: Output: default@ice_orc
Expand Down Expand Up @@ -321,8 +322,8 @@ PREHOOK: type: SHOW COMPACTIONS
POSTHOOK: query: show compactions order by 'partition'
POSTHOOK: type: SHOW COMPACTIONS
CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name TxnId Next TxnId Commit Time Highest WriteId
#Masked# default ice_orc dept_id=1 MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc dept_id=2 MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc dept_id=1 MAJOR succeeded #Masked# manual iceberg 0 0 0 ---
#Masked# default ice_orc dept_id=2 MAJOR succeeded #Masked# manual iceberg 0 0 0 ---
PREHOOK: query: insert into ice_orc VALUES ('fn11','ln11', 1)
PREHOOK: type: QUERY
PREHOOK: Input: _dummy_database@_dummy_table
Expand Down Expand Up @@ -547,11 +548,11 @@ InputFormat: org.apache.iceberg.mr.hive.HiveIcebergInputFormat
OutputFormat: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat
Compressed: No
Sort Columns: []
PREHOOK: query: explain alter table ice_orc COMPACT 'major' and wait
PREHOOK: query: explain alter table ice_orc COMPACT 'major' and wait pool 'iceberg'
PREHOOK: type: ALTERTABLE_COMPACT
PREHOOK: Input: default@ice_orc
PREHOOK: Output: default@ice_orc
POSTHOOK: query: explain alter table ice_orc COMPACT 'major' and wait
POSTHOOK: query: explain alter table ice_orc COMPACT 'major' and wait pool 'iceberg'
POSTHOOK: type: ALTERTABLE_COMPACT
POSTHOOK: Input: default@ice_orc
POSTHOOK: Output: default@ice_orc
Expand All @@ -564,14 +565,15 @@ STAGE PLANS:
compaction type: major
table name: default.ice_orc
numberOfBuckets: 0
pool: iceberg
table name: default.ice_orc
blocking: true

PREHOOK: query: alter table ice_orc COMPACT 'major' and wait
PREHOOK: query: alter table ice_orc COMPACT 'major' and wait pool 'iceberg'
PREHOOK: type: ALTERTABLE_COMPACT
PREHOOK: Input: default@ice_orc
PREHOOK: Output: default@ice_orc
POSTHOOK: query: alter table ice_orc COMPACT 'major' and wait
POSTHOOK: query: alter table ice_orc COMPACT 'major' and wait pool 'iceberg'
POSTHOOK: type: ALTERTABLE_COMPACT
POSTHOOK: Input: default@ice_orc
POSTHOOK: Output: default@ice_orc
Expand Down Expand Up @@ -652,7 +654,7 @@ PREHOOK: type: SHOW COMPACTIONS
POSTHOOK: query: show compactions order by 'partition'
POSTHOOK: type: SHOW COMPACTIONS
CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name TxnId Next TxnId Commit Time Highest WriteId
#Masked# default ice_orc dept_id=1 MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc dept_id=1 MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc dept_id=2 MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc dept_id=2 MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc dept_id=1 MAJOR succeeded #Masked# manual iceberg 0 0 0 ---
#Masked# default ice_orc dept_id=1 MAJOR succeeded #Masked# manual iceberg 0 0 0 ---
#Masked# default ice_orc dept_id=2 MAJOR succeeded #Masked# manual iceberg 0 0 0 ---
#Masked# default ice_orc dept_id=2 MAJOR succeeded #Masked# manual iceberg 0 0 0 ---
Original file line number Diff line number Diff line change
Expand Up @@ -212,11 +212,11 @@ InputFormat: org.apache.iceberg.mr.hive.HiveIcebergInputFormat
OutputFormat: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat
Compressed: No
Sort Columns: []
PREHOOK: query: explain alter table ice_orc COMPACT 'major' and wait
PREHOOK: query: explain alter table ice_orc COMPACT 'major' and wait pool 'iceberg'
PREHOOK: type: ALTERTABLE_COMPACT
PREHOOK: Input: default@ice_orc
PREHOOK: Output: default@ice_orc
POSTHOOK: query: explain alter table ice_orc COMPACT 'major' and wait
POSTHOOK: query: explain alter table ice_orc COMPACT 'major' and wait pool 'iceberg'
POSTHOOK: type: ALTERTABLE_COMPACT
POSTHOOK: Input: default@ice_orc
POSTHOOK: Output: default@ice_orc
Expand All @@ -229,14 +229,15 @@ STAGE PLANS:
compaction type: major
table name: default.ice_orc
numberOfBuckets: 0
pool: iceberg
table name: default.ice_orc
blocking: true

PREHOOK: query: explain optimize table ice_orc rewrite data
PREHOOK: query: explain optimize table ice_orc rewrite data pool 'iceberg'
PREHOOK: type: ALTERTABLE_COMPACT
PREHOOK: Input: default@ice_orc
PREHOOK: Output: default@ice_orc
POSTHOOK: query: explain optimize table ice_orc rewrite data
POSTHOOK: query: explain optimize table ice_orc rewrite data pool 'iceberg'
POSTHOOK: type: ALTERTABLE_COMPACT
POSTHOOK: Input: default@ice_orc
POSTHOOK: Output: default@ice_orc
Expand All @@ -249,14 +250,15 @@ STAGE PLANS:
compaction type: major
table name: default.ice_orc
numberOfBuckets: 0
pool: iceberg
table name: default.ice_orc
blocking: true

PREHOOK: query: alter table ice_orc COMPACT 'major' and wait
PREHOOK: query: alter table ice_orc COMPACT 'major' and wait pool 'iceberg'
PREHOOK: type: ALTERTABLE_COMPACT
PREHOOK: Input: default@ice_orc
PREHOOK: Output: default@ice_orc
POSTHOOK: query: alter table ice_orc COMPACT 'major' and wait
POSTHOOK: query: alter table ice_orc COMPACT 'major' and wait pool 'iceberg'
POSTHOOK: type: ALTERTABLE_COMPACT
POSTHOOK: Input: default@ice_orc
POSTHOOK: Output: default@ice_orc
Expand Down Expand Up @@ -327,4 +329,4 @@ PREHOOK: type: SHOW COMPACTIONS
POSTHOOK: query: show compactions
POSTHOOK: type: SHOW COMPACTIONS
CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name TxnId Next TxnId Commit Time Highest WriteId
#Masked# default ice_orc --- MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc --- MAJOR succeeded #Masked# manual iceberg 0 0 0 ---
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ PREHOOK: query: create table ice_orc (
last_name string
)
stored by iceberg stored as orc
tblproperties ('format-version'='2')
tblproperties ('format-version'='2', 'hive.compactor.worker.pool'='iceberg')
PREHOOK: type: CREATETABLE
PREHOOK: Output: database:default
PREHOOK: Output: default@ice_orc
Expand All @@ -12,7 +12,7 @@ POSTHOOK: query: create table ice_orc (
last_name string
)
stored by iceberg stored as orc
tblproperties ('format-version'='2')
tblproperties ('format-version'='2', 'hive.compactor.worker.pool'='iceberg')
POSTHOOK: type: CREATETABLE
POSTHOOK: Output: database:default
POSTHOOK: Output: default@ice_orc
Expand Down Expand Up @@ -83,6 +83,7 @@ Table Parameters:
current-snapshot-summary {\"added-position-delete-files\":\"1\",\"added-delete-files\":\"1\",\"added-files-size\":\"#Masked#\",\"added-position-deletes\":\"3\",\"changed-partition-count\":\"1\",\"total-records\":\"7\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"1\",\"total-delete-files\":\"1\",\"total-position-deletes\":\"3\",\"total-equality-deletes\":\"0\",\"iceberg-version\":\"#Masked#\"}
current-snapshot-timestamp-ms #Masked#
format-version 2
hive.compactor.worker.pool iceberg
iceberg.orc.files.only true
#### A masked pattern was here ####
numFiles 1
Expand Down Expand Up @@ -195,6 +196,7 @@ Table Parameters:
current-snapshot-summary {\"added-data-files\":\"1\",\"deleted-data-files\":\"1\",\"removed-position-delete-files\":\"1\",\"removed-delete-files\":\"1\",\"added-records\":\"4\",\"deleted-records\":\"7\",\"added-files-size\":\"#Masked#\",\"removed-files-size\":\"#Masked#\",\"removed-position-deletes\":\"3\",\"changed-partition-count\":\"1\",\"total-records\":\"4\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"1\",\"total-delete-files\":\"0\",\"total-position-deletes\":\"0\",\"total-equality-deletes\":\"0\",\"iceberg-version\":\"#Masked#\"}
current-snapshot-timestamp-ms #Masked#
format-version 2
hive.compactor.worker.pool iceberg
iceberg.orc.files.only true
#### A masked pattern was here ####
numFiles 1
Expand Down Expand Up @@ -225,4 +227,4 @@ PREHOOK: type: SHOW COMPACTIONS
POSTHOOK: query: show compactions
POSTHOOK: type: SHOW COMPACTIONS
CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name TxnId Next TxnId Commit Time Highest WriteId
#Masked# default ice_orc --- MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc --- MAJOR succeeded #Masked# manual iceberg 0 0 0 ---
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public class TestIcebergLlapLocalCompactorCliDriver {

static CliAdapter adapter = new CliConfigs.IcebergLlapLocalCompactorCliConfig().getCliAdapter();
private static final AtomicBoolean stop = new AtomicBoolean();
private static final String defaultPoolName = null;
private static final String icebergPoolName = "iceberg";
private static Worker worker;

@Parameters(name ="{0}")
Expand All @@ -50,8 +52,14 @@ public static List<Object[]> getParameters() throws Exception {

@BeforeClass
public static void setup() throws Exception {
setupWorker(defaultPoolName);
setupWorker(icebergPoolName);
}

private static void setupWorker(String poolName) throws Exception {
worker = new Worker();
worker.setConf(SessionState.get().getConf());
worker.setPoolName(poolName);
stop.set(false);
worker.init(stop);
worker.start();
Expand All @@ -78,6 +86,7 @@ public TestIcebergLlapLocalCompactorCliDriver(String name, File qfile) {

@Test
public void testCliDriver() throws Exception {
adapter.setUp();
adapter.runTest(name, qfile);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ optimizeTableStatementSuffix
optimizeTblRewriteDataSuffix
@init { gParent.msgs.push("compaction request"); }
@after { gParent.msgs.pop(); }
: KW_REWRITE KW_DATA orderByClause? whereClause?
-> ^(TOK_ALTERTABLE_COMPACT Identifier["'MAJOR'"] TOK_BLOCKING orderByClause? whereClause?)
: KW_REWRITE KW_DATA orderByClause? whereClause? compactPool?
-> ^(TOK_ALTERTABLE_COMPACT Identifier["'MAJOR'"] TOK_BLOCKING orderByClause? whereClause? compactPool?)
;

alterStatementPartitionKeyType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,24 @@ public void testOptimizeTableWithOrderBy() throws Exception {
" optimize table tbl0 rewrite data order by col01 desc", null).getTree();
assertThat(tree.dump(), is(EXPECTED_ORDER_BY));
}

@Test
public void testOptimizeTableWithPool() throws Exception {

String EXPECTED_WITH_COMPACT_POOL = "\n" +
"nil\n" +
" TOK_ALTERTABLE\n" +
" TOK_TABNAME\n" +
" tbl0\n" +
" TOK_ALTERTABLE_COMPACT\n" +
" 'MAJOR'\n" +
" TOK_BLOCKING\n" +
" TOK_COMPACT_POOL\n" +
" 'iceberg'\n" +
" <EOF>\n";

ASTNode tree = parseDriver.parse(
" optimize table tbl0 rewrite data pool 'iceberg'", null).getTree();
assertThat(tree.dump(), is(EXPECTED_WITH_COMPACT_POOL));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ else if (desc.getPartitionSpec() != null) {
CompactionRequest compactionRequest = new CompactionRequest(table.getDbName(), table.getTableName(),
compactionTypeStr2ThriftType(desc.getCompactionType()));

compactionRequest.setPoolName(desc.getPoolName());
compactionRequest.setPoolName(desc.getPoolName() != null ? desc.getPoolName() :
CompactorUtil.getCompactPoolName(context.getConf(), table));
compactionRequest.setProperties(desc.getProperties());
compactionRequest.setInitiatorId(JavaUtils.hostname() + "-" + HiveMetaStoreClient.MANUALLY_INITIATED_COMPACTION);
compactionRequest.setInitiatorVersion(HiveMetaStoreClient.class.getPackage().getImplementationVersion());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.utils.StringableMap;
import org.apache.hadoop.hive.ql.ddl.DDLUtils;
import org.apache.hadoop.hive.ql.io.AcidDirectory;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.shims.HadoopShims;
Expand Down Expand Up @@ -565,4 +566,16 @@ public static Map<String, Integer> getPoolConf(HiveConf hiveConf) {
}
return poolConf;
}

public static String getCompactPoolName(HiveConf conf, org.apache.hadoop.hive.ql.metadata.Table table)
throws Exception {
String poolName;
Map<String, String> params = table.getParameters();
poolName = params == null ? null : params.get(Constants.HIVE_COMPACTOR_WORKER_POOL);
if (StringUtils.isBlank(poolName)) {
params = CompactorUtil.resolveDatabase(conf, table.getDbName()).getParameters();
poolName = params == null ? null : params.get(Constants.HIVE_COMPACTOR_WORKER_POOL);
}
return poolName;
}
}

0 comments on commit 2110fc7

Please sign in to comment.