-
Notifications
You must be signed in to change notification settings - Fork 2.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Parquet: Support parquet modular encryption #2639
Conversation
I'm approving this to run unit tests. Thanks for working on this, @ggershinsky! |
@ggershinsky, the doc you linked to is quite long and it isn't clear what part of it explains what you're doing here. Can you provide a quick summary of how to plug into Parquet encryption and what this does? I'm assuming that it provides Parquet's equivalent of an |
Thanks @rdblue! |
Certainly. There are two encryption interfaces in parquet-mr-1.12.0 : low-level (direct impl of the spec; max flexibility; no key management) and high-level (a layer on top of low-level, with a lib-local key management tools driven by Hadoop properties). In Iceberg, we'll use directly the low-level Parquet encryption API - because the key management will be done by Iceberg, in a similar fashion for all formats; and because Iceberg has a centralized manifest capability, which makes key management more efficient than running lib-local nodes in each worker process.
Well, here we have the gap that I've described in the other PR. To encrypt data(/delete) files, we need the AES keys - the DEKs, "data encryption keys" which are used to actually encrypt the data and metadata modules (there must be a unique DEK per file/column). But the In a future version, we might want to generate the DEKs (or get them from KMS) in the manifest writer process, and then distribute them to data/delete file writers, with a unique DEK per file (or a set of unique DEKs per file, for column encryption). This seems to be more complicated, and less fitting the current Iceberg flow; my suggestion would be to start with the reverse approach described above. |
Hi @ggershinsky , can you rebase this PR to have #2441? |
Hi @flyrain, will do, thanks for the notice. |
409a047
to
97f6080
Compare
72cb406
to
97f6080
Compare
@ggershinsky hello,Is there any latest development of this feature, we are looking forward to this feature |
hi @liujinhui1994 , yes, this feature is under active development. Per the community discussions, the encryption PRs will be updated in Q1'22; probably I'll have the commits ready later this month or in Feb. |
97f6080
to
b8b9270
Compare
@ggershinsky Let me know when it is ready for review. I would love to review this change. |
Hi @shangxinli ; frankly, the only dependency left is the parquet-mr update. This PR depends on the "uniform encryption" feature, which is already merged in the parquet master, but not released yet. We've started this discussion in the last parquet community sync, it's on track. Once the parquet-mr master is cut (as 1.12.3 or 1.13.0 :), this Iceberg PR will be able to pass CI, ready for a review. |
OK. I will have a look once I have time.
…On Tue, Feb 8, 2022 at 10:35 AM ggershinsky ***@***.***> wrote:
Hi @shangxinli <https://github.com/shangxinli> ; frankly, the only
dependency left is the parquet-mr update. This PR depends on the "uniform
encryption" feature, which is already merged in the parquet master, but not
released yet. We've started this discussion in the last parquet community
sync, it's on track. Once the parquet-mr master is cut (as 1.12.3 or 1.13.0
:), this Iceberg PR will be able to pass CI, ready for a review.
—
Reply to this email directly, view it on GitHub
<#2639 (comment)>, or
unsubscribe
<https://github.com/notifications/unsubscribe-auth/AHPXKMIIJC57SQDO2WA22UTU2FPALANCNFSM45SBJ3JQ>
.
Triage notifications on the go with GitHub Mobile for iOS
<https://apps.apple.com/app/apple-store/id1477376905?ct=notification-email&mt=8&pt=524675>
or Android
<https://play.google.com/store/apps/details?id=com.github.android&referrer=utm_campaign%3Dnotification-email%26utm_medium%3Demail%26utm_source%3Dgithub>.
You are receiving this because you were mentioned.Message ID:
***@***.***>
--
Xinli Shang
|
8b87cc0
to
380e3e2
Compare
parquetEncryptionAlgorithm = ParquetCipher.AES_GCM_V1; // default | ||
LOG.info("No encryption algorithm specified. Using Parquet default - AES_GCM_V1"); | ||
} else { | ||
EncryptionAlgorithm icebergEncryptionAlgorithm = nativeParameters.encryptionAlgorithm(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: a switch would look simpler than an if block.
} | ||
|
||
ByteBuffer footerDataKey = nativeParameters.fileKey(); | ||
if (null == footerDataKey) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
File key must not be null when we push down encryption, right? If so, can we move this check to NativeFileCryptoParameters#build ? So we won't need to check if the file key is null every time when converting NativeFileCryptoParameters to Parquet / ORC config.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe ORC works with column keys, not files keys. So this is a parquet-specific check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, thanks for explanation.
978c2b6
to
dc7aa9b
Compare
@@ -139,6 +153,13 @@ private WriteBuilder(OutputFile file) { | |||
} else { | |||
this.conf = new Configuration(); | |||
} | |||
if (file instanceof NativelyEncryptedFile) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: empty line between blocks
parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
Outdated
Show resolved
Hide resolved
|
||
try (ParquetFileReader fileReader = | ||
newReader( | ||
file, ParquetReadOptions.builder().withDecryption(decryptionProperties).build())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Style: it would be better to create the new read options on a separate line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried a couple of ways to break the line(s), but the spotless check hasn't accepted them, the spotless apply always returns the code to this form.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not talking about reformatting the line breaks, I mean to create a variable:
ParquetReadOptions readOptions = ParquetReadOptions.builder().withDecryption(decryptionProperties).build();
try (ParquetFileReader fileReader = newReader(file, readOptions)) {
...
}
parquet/src/test/java/org/apache/iceberg/parquet/TestParquetEncryption.java
Outdated
Show resolved
Hide resolved
parquet/src/test/java/org/apache/iceberg/parquet/TestParquetEncryption.java
Show resolved
Hide resolved
@ggershinsky, this is looking good, but there are a few minor updates needed and it requires more testing to exercise more of the Iceberg specific code. You can check out code coverage to see when you're hitting all the areas you've changed. Also, for the change to remove reflection, here's a diff since I explored that locally: [blue@work iceberg]$ git diff
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
index af2fb0e80a..ee3a8f50be 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
@@ -28,20 +28,16 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.Metrics;
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.Schema;
-import org.apache.iceberg.common.DynConstructors;
-import org.apache.iceberg.common.DynMethods;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
-import org.apache.parquet.bytes.ByteBufferAllocator;
import org.apache.parquet.column.ColumnWriteStore;
import org.apache.parquet.column.ParquetProperties;
-import org.apache.parquet.column.page.PageWriteStore;
-import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore;
import org.apache.parquet.crypto.FileEncryptionProperties;
import org.apache.parquet.crypto.InternalFileEncryptor;
import org.apache.parquet.hadoop.CodecFactory;
+import org.apache.parquet.hadoop.ColumnChunkPageWriteStore;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;
@@ -50,25 +46,6 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable {
private static final Metrics EMPTY_METRICS = new Metrics(0L, null, null, null, null);
- private static final DynConstructors.Ctor<PageWriteStore> pageStoreCtorParquet =
- DynConstructors.builder(PageWriteStore.class)
- .hiddenImpl(
- "org.apache.parquet.hadoop.ColumnChunkPageWriteStore",
- CodecFactory.BytesCompressor.class,
- MessageType.class,
- ByteBufferAllocator.class,
- int.class,
- boolean.class,
- InternalFileEncryptor.class,
- int.class)
- .build();
-
- private static final DynMethods.UnboundMethod flushToWriter =
- DynMethods.builder("flushToFileWriter")
- .hiddenImpl(
- "org.apache.parquet.hadoop.ColumnChunkPageWriteStore", ParquetFileWriter.class)
- .build();
-
private final long targetRowGroupSize;
private final Map<String, String> metadata;
private final ParquetProperties props;
@@ -81,7 +58,7 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable {
private final OutputFile output;
private final Configuration conf;
- private DynMethods.BoundMethod flushPageStoreToWriter;
+ private ColumnChunkPageWriteStore pageStore = null;
private ColumnWriteStore writeStore;
private long recordCount = 0;
private long nextCheckRecordCount = 10;
@@ -232,7 +209,7 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable {
ensureWriterInitialized();
writer.startBlock(recordCount);
writeStore.flush();
- flushPageStoreToWriter.invoke(writer);
+ pageStore.flushToFileWriter(writer);
writer.endBlock();
if (!finished) {
writeStore.close();
@@ -253,8 +230,8 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable {
props.getMaxRowCountForPageSizeCheck());
this.recordCount = 0;
- PageWriteStore pageStore =
- pageStoreCtorParquet.newInstance(
+ this.pageStore =
+ new ColumnChunkPageWriteStore(
compressor,
parquetSchema,
props.getAllocator(),
@@ -264,9 +241,7 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable {
rowGroupOrdinal);
this.rowGroupOrdinal++;
- this.flushPageStoreToWriter = flushToWriter.bind(pageStore);
- this.writeStore =
- props.newColumnWriteStore(parquetSchema, pageStore, (BloomFilterWriteStore) pageStore);
+ this.writeStore = props.newColumnWriteStore(parquetSchema, pageStore, pageStore);
model.setColumnStore(writeStore);
} |
parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
Outdated
Show resolved
Hide resolved
...c/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java
Outdated
Show resolved
Hide resolved
...c/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java
Outdated
Show resolved
Hide resolved
...c/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java
Outdated
Show resolved
Hide resolved
...c/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java
Show resolved
Hide resolved
@ggershinsky this looks close. Just style and naming issues left. |
@ggershinsky, looks like this is out of date. Can you rebase? |
e391461
to
7c877d5
Compare
update and clean up update unitest clean up indent clean up update read conf style update style update isolate from Spark/PME configuration refactor common encrypted IO classes for unitests format fixes spotless apply use key metadata address review comments clean up post-review changes post-review changes 2 update method names in tests separate read options line conflict resolution conflict resolution 2 conflict resolution 3 spotless fix add decryption check
091a560
to
2909aa3
Compare
@rdblue The PR is rebased |
Thanks, @ggershinsky! |
Implements #1413