Skip to content

Commit

Permalink
[POC] Demonstrate another way for temp table compression codec
Browse files Browse the repository at this point in the history
  • Loading branch information
wenleix committed Apr 15, 2019
1 parent b881748 commit 592f5bb
Show file tree
Hide file tree
Showing 11 changed files with 37 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public interface HiveFileWriterFactory
{
Optional<HiveFileWriter> createFileWriter(
Path path,
boolean temporaryTable,
List<String> inputColumnNames,
StorageFormat storageFormat,
Properties schema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public HiveInsertTableHandle(
@JsonProperty("pageSinkMetadata") HivePageSinkMetadata pageSinkMetadata,
@JsonProperty("locationHandle") LocationHandle locationHandle,
@JsonProperty("bucketProperty") Optional<HiveBucketProperty> bucketProperty,
@JsonProperty("temporaryTable") boolean temporaryTable,
@JsonProperty("tableStorageFormat") HiveStorageFormat tableStorageFormat,
@JsonProperty("partitionStorageFormat") HiveStorageFormat partitionStorageFormat)
{
Expand All @@ -45,6 +46,7 @@ public HiveInsertTableHandle(
pageSinkMetadata,
locationHandle,
bucketProperty,
temporaryTable,
tableStorageFormat,
partitionStorageFormat);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1122,6 +1122,7 @@ public HiveOutputTableHandle beginCreateTable(ConnectorSession session, Connecto
partitionStorageFormat,
partitionedBy,
bucketProperty,
false,
session.getUser(),
tableProperties);

Expand Down Expand Up @@ -1382,6 +1383,7 @@ public HiveInsertTableHandle beginInsert(ConnectorSession session, ConnectorTabl
metastore.generatePageSinkMetadata(tableName),
locationHandle,
table.get().getStorage().getBucketProperty(),
table.get().getTableType().equals(TEMPORARY_TABLE),
tableStorageFormat,
isRespectTableFormat(session) ? tableStorageFormat : HiveSessionProperties.getHiveStorageFormat(session));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public HiveOutputTableHandle(
@JsonProperty("partitionStorageFormat") HiveStorageFormat partitionStorageFormat,
@JsonProperty("partitionedBy") List<String> partitionedBy,
@JsonProperty("bucketProperty") Optional<HiveBucketProperty> bucketProperty,
@JsonProperty("temporaryTable") boolean temporaryTable,
@JsonProperty("tableOwner") String tableOwner,
@JsonProperty("additionalTableParameters") Map<String, String> additionalTableParameters)
{
Expand All @@ -57,6 +58,7 @@ public HiveOutputTableHandle(
pageSinkMetadata,
locationHandle,
bucketProperty,
temporaryTable,
tableStorageFormat,
partitionStorageFormat);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ private ConnectorPageSink createPageSink(HiveWritableTableHandle handle, boolean
handle.getSchemaName(),
handle.getTableName(),
isCreateTable,
handle.isTemporaryTable(),
handle.getInputColumns(),
handle.getTableStorageFormat(),
handle.getPartitionStorageFormat(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public class HiveWritableTableHandle
private HivePageSinkMetadata pageSinkMetadata;
private final LocationHandle locationHandle;
private final Optional<HiveBucketProperty> bucketProperty;
private final boolean temporaryTable;
private final HiveStorageFormat tableStorageFormat;
private final HiveStorageFormat partitionStorageFormat;

Expand All @@ -42,6 +43,7 @@ public HiveWritableTableHandle(
HivePageSinkMetadata pageSinkMetadata,
LocationHandle locationHandle,
Optional<HiveBucketProperty> bucketProperty,
boolean temporaryTable,
HiveStorageFormat tableStorageFormat,
HiveStorageFormat partitionStorageFormat)
{
Expand All @@ -52,6 +54,7 @@ public HiveWritableTableHandle(
this.pageSinkMetadata = requireNonNull(pageSinkMetadata, "pageSinkMetadata is null");
this.locationHandle = requireNonNull(locationHandle, "locationHandle is null");
this.bucketProperty = requireNonNull(bucketProperty, "bucketProperty is null");
this.temporaryTable = temporaryTable;
this.tableStorageFormat = requireNonNull(tableStorageFormat, "tableStorageFormat is null");
this.partitionStorageFormat = requireNonNull(partitionStorageFormat, "partitionStorageFormat is null");
}
Expand Down Expand Up @@ -98,6 +101,12 @@ public Optional<HiveBucketProperty> getBucketProperty()
return bucketProperty;
}

@JsonProperty
public boolean isTemporaryTable()
{
return temporaryTable;
}

@JsonProperty
public HiveStorageFormat getTableStorageFormat()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import static com.facebook.presto.hive.HiveErrorCode.HIVE_PATH_ALREADY_EXISTS;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_UNSUPPORTED_FORMAT;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_WRITER_OPEN_ERROR;
import static com.facebook.presto.hive.HiveSessionProperties.isTemporaryStagingDirectoryEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isWritingStagingFilesEnabled;
import static com.facebook.presto.hive.HiveType.toHiveTypes;
import static com.facebook.presto.hive.HiveWriteUtils.createPartitionValues;
Expand Down Expand Up @@ -104,6 +105,7 @@ public class HiveWriterFactory
private final Set<HiveFileWriterFactory> fileWriterFactories;
private final String schemaName;
private final String tableName;
private final boolean temporaryTable;

private final List<DataColumn> dataColumns;

Expand Down Expand Up @@ -145,6 +147,7 @@ public HiveWriterFactory(
String schemaName,
String tableName,
boolean isCreateTable,
boolean isTemporaryTable,
List<HiveColumnHandle> inputColumns,
HiveStorageFormat tableStorageFormat,
HiveStorageFormat partitionStorageFormat,
Expand Down Expand Up @@ -256,6 +259,7 @@ public HiveWriterFactory(
this.hiveWriterStats = requireNonNull(hiveWriterStats, "hiveWriterStats is null");

this.orcFileWriterFactory = requireNonNull(orcFileWriterFactory, "orcFileWriterFactory is null");
this.temporaryTable = isTemporaryTable;
}

public HiveWriter createWriter(Page partitionColumns, int position, OptionalInt bucketNumber)
Expand Down Expand Up @@ -446,6 +450,7 @@ else if (insertExistingPartitionsBehavior == InsertExistingPartitionsBehavior.ER
for (HiveFileWriterFactory fileWriterFactory : fileWriterFactories) {
Optional<HiveFileWriter> fileWriter = fileWriterFactory.createFileWriter(
path,
temporaryTable,
dataColumns.stream()
.map(DataColumn::getName)
.collect(toList()),
Expand All @@ -460,6 +465,8 @@ else if (insertExistingPartitionsBehavior == InsertExistingPartitionsBehavior.ER
}

if (hiveFileWriter == null) {
checkArgument(!this.temporaryTable);

hiveFileWriter = new RecordFileWriter(
path,
dataColumns.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ public OrcWriterStats getStats()
@Override
public Optional<HiveFileWriter> createFileWriter(
Path path,
boolean temporaryTable,
List<String> inputColumnNames,
StorageFormat storageFormat,
Properties schema,
Expand All @@ -144,7 +145,7 @@ else if (com.facebook.hive.orc.OrcOutputFormat.class.getName().equals(storageFor
return Optional.empty();
}

CompressionKind compression = getCompression(schema, configuration, orcEncoding);
CompressionKind compression = getCompression(schema, configuration, orcEncoding, temporaryTable);

// existing tables and partitions may have columns in a different order than the writer is providing, so build
// an index to rearrange columns in the proper order
Expand Down Expand Up @@ -223,8 +224,12 @@ protected OrcDataSink createOrcDataSink(ConnectorSession session, FileSystem fil
return new OutputStreamOrcDataSink(fileSystem.create(path));
}

private static CompressionKind getCompression(Properties schema, JobConf configuration, OrcEncoding orcEncoding)
private static CompressionKind getCompression(Properties schema, JobConf configuration, OrcEncoding orcEncoding, boolean temporaryTable)
{
if (temporaryTable) {
return CompressionKind.SNAPPY;
}

String compressionName = schema.getProperty(OrcTableProperties.COMPRESSION.getPropName());
if (compressionName == null) {
compressionName = configuration.get("hive.exec.orc.default.compress");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import static com.facebook.presto.hive.HiveErrorCode.HIVE_WRITE_VALIDATION_FAILED;
import static com.facebook.presto.hive.HiveType.toHiveTypes;
import static com.facebook.presto.hive.rcfile.RcFilePageSourceFactory.createTextVectorEncoding;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;
import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_COLUMNS;
Expand Down Expand Up @@ -89,12 +90,15 @@ public RcFileFileWriterFactory(
@Override
public Optional<HiveFileWriter> createFileWriter(
Path path,
boolean temporaryTable,
List<String> inputColumnNames,
StorageFormat storageFormat,
Properties schema,
JobConf configuration,
ConnectorSession session)
{
checkArgument(!temporaryTable);

if (!HiveSessionProperties.isRcfileOptimizedWriterEnabled(session)) {
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,7 @@ public static FileSplit createTestFile(

Optional<HiveFileWriter> fileWriter = fileWriterFactory.createFileWriter(
new Path(filePath),
false,
testColumns.stream()
.map(TestColumn::getName)
.collect(toList()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ private static ConnectorPageSink createPageSink(HiveTransactionHandle transactio
config.getHiveStorageFormat(),
ImmutableList.of(),
Optional.empty(),
false,
"test",
ImmutableMap.of());
JsonCodec<PartitionUpdate> partitionUpdateCodec = JsonCodec.jsonCodec(PartitionUpdate.class);
Expand Down

0 comments on commit 592f5bb

Please sign in to comment.