Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Protobuf support for trino-kafka #14734

Merged
merged 2 commits into from
Nov 23, 2022

Conversation

nevillelyh
Copy link
Member

@nevillelyh nevillelyh commented Oct 24, 2022

Description

Add Protobuf support for Kafka

This is a contribution from the Starburst Kafka Connector

Non-technical explanation

Release notes

( ) This is not user-visible or docs only and no release notes are required.
( ) Release notes are required, please propose a release note for me.
(x) Release notes are required, with the following suggested text:

# Section
* Add Protobuf message encoding/decoding support for Kafka connector

@cla-bot cla-bot bot added the cla-signed label Oct 24, 2022
@github-actions github-actions bot added the docs label Oct 24, 2022
@nevillelyh
Copy link
Member Author

Code, tests & docs are ready. Still missing product tests, will add later.

pom.xml Show resolved Hide resolved
@nevillelyh
Copy link
Member Author

It's still failing suite-6-non-generic after I fixed the kafka-protobuf-provider dependency in test containers. Cannot reproduce locally. Possibly a caching issue?

presto-master       | java.lang.NoClassDefFoundError: io/confluent/kafka/schemaregistry/protobuf/ProtobufSchemaProvider
presto-master       | 	at io.trino.plugin.kafka.schema.confluent.ConfluentModule$LazyLoadedProtobufSchemaProvider.create(ConfluentModule.java:186)
presto-master       | 	at com.google.common.base.Suppliers$NonSerializableMemoizingSupplier.get(Suppliers.java:183)
presto-master       | 	at io.trino.plugin.kafka.schema.confluent.ConfluentModule$LazyLoadedProtobufSchemaProvider.parseSchema(ConfluentModule.java:181)
presto-master       | 	at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.parseSchema(CachedSchemaRegistryClient.java:197)
presto-master       | 	at io.trino.plugin.kafka.schema.confluent.ClassLoaderSafeSchemaRegistryClient.parseSchema(ClassLoaderSafeSchemaRegistryClient.java:49)

@nevillelyh nevillelyh force-pushed the neville/kafka-protobuf branch 2 times, most recently from 3595513 to 6f85b81 Compare November 2, 2022 16:24
@michaelpearce-gain
Copy link

michaelpearce-gain commented Nov 4, 2022

One thing looking at this i couldnt spot where the smarts is to auto deduce if to use proto or avro when using schema registry client, where a broker may have topics with both

Schema registry holds the schematype now and like wise supports avro, and proto, so just from the schema id or from the subject latest schema you can auto deduce if its avro or proto, thus easily allowing to switch based on this which deserializing method to use (avro vs proto) so you can have a broker with both protocol types in use, and by topic it can be auto deduced which one to use

SchemaRegistryClient src = <init>

ParsedSchema parsedSchema = src.getSchemaById(0);
parsedSchema.schemaType()

I could be blind and missed it in which case if you could point me to it.

@nevillelyh nevillelyh force-pushed the neville/kafka-protobuf branch 3 times, most recently from 1e828f6 to 2649add Compare November 4, 2022 21:58
@nevillelyh
Copy link
Member Author

Tests finally green. @Praveen2112 PTAL when you get a chance? 🙏

plugin/trino-kafka/pom.xml Show resolved Hide resolved
protoType.getValueType(),
definedMessages))
.build());
// Handle for underscores and name
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a test for this case ?

Copy link
Member Author

@nevillelyh nevillelyh Nov 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do we want to test here, that the descriptor is constructed correctly? That's covered by TestProtobuf{Encoder,Decoder} already. Commenting out anything except .setName(field.getName()) fails the tests.

EDIT: actually setName(field.getName) is already called before the if branch and is redundant.

@nevillelyh nevillelyh force-pushed the neville/kafka-protobuf branch 2 times, most recently from d662a51 to 0914de4 Compare November 8, 2022 02:18
Copy link
Member

@Praveen2112 Praveen2112 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could have a single commit where we could implement the Protobuf in the decoder module and use them in Kafka connector for now.

@nevillelyh
Copy link
Member Author

@Praveen2112 rebased into 2 commits, decoder module & kafka support.

Copy link
Member

@Praveen2112 Praveen2112 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM Minor comments

<groupId>com.squareup.wire</groupId>
<artifactId>wire-schema</artifactId>
</dependency>

<dependency>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we exclude few resources for duplicate classfinder to be happy ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like Guava is the only one we can exclude? Kotlin is already pinned to make enforcer happy.
https://mvnrepository.com/artifact/com.squareup.wire/wire-schema/3.2.2

Speaking of, do we want a more recent version of this? They removed Guava dependency but bumped Kotlin, which should be backwards compatible.
https://mvnrepository.com/artifact/com.squareup.wire/wire-schema/4.4.3

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could try updating to the latest version. We need to confirm if schema-registry libraries have some conflict with the latest version.

return Optional.of(columnDecoders.entrySet().stream()
.collect(toImmutableMap(
Map.Entry::getKey,
entry -> entry.getValue().decodeField(dynamicMessageProvider.parseDynamicMessage(data)))));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we parse it once and use the DynamicMessage for all the required columns.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we fix this ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pretty sure I did but must've lost it during rebase. Just fixed.

docs/src/main/sphinx/connector/kafka.rst Show resolved Hide resolved
@Praveen2112
Copy link
Member

Let me know once the changes are applied.

@nevillelyh
Copy link
Member Author

@Praveen2112 hey yeah I addressed most except the question regarding excisions #14734 (comment)
Thoughts?

Copy link
Member

@Praveen2112 Praveen2112 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some of the changes were not applied.

<groupId>com.squareup.wire</groupId>
<artifactId>wire-schema</artifactId>
</dependency>

<dependency>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could try updating to the latest version. We need to confirm if schema-registry libraries have some conflict with the latest version.

pom.xml Outdated Show resolved Hide resolved
return round(micros, MAX_SHORT_PRECISION - precision);
}

private static Descriptors.FieldDescriptor getFieldDescriptor(DynamicMessage message, String name)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto

@nevillelyh nevillelyh force-pushed the neville/kafka-protobuf branch 3 times, most recently from 8e825c2 to 3db448d Compare November 15, 2022 16:48
@nevillelyh
Copy link
Member Author

nevillelyh commented Nov 15, 2022

@Praveen2112 localized duplicate-finder-maven-plugin settings, fixed a few more static imports, going to leave wire-schema version as is for this change. Should be good now.

@nevillelyh nevillelyh force-pushed the neville/kafka-protobuf branch 2 times, most recently from 18729a3 to c46bb5a Compare November 23, 2022 01:24
@Praveen2112 Praveen2112 merged commit 55f306b into trinodb:master Nov 23, 2022
@Praveen2112
Copy link
Member

Thanks for working on this.

@github-actions github-actions bot added this to the 404 milestone Nov 23, 2022
@nevillelyh nevillelyh deleted the neville/kafka-protobuf branch November 23, 2022 15:25
@ignitz
Copy link

ignitz commented Nov 23, 2022

Holy shit guys. That's awesome. 🎉

@Giovarco
Copy link

Giovarco commented Nov 30, 2022

Hello! I'm on version 403 and can't find a way to handle protobuf messages (as far as I tried) and this protobuf support is nice to have! I'd like to test this. Have you documented anywhere how to use this?

Comment on lines +101 to +102
if (fieldDescriptor.getMessageType().getFullName().equals(TIMESTAMP_TYPE_NAME)) {
return createTimestampType(6);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per https://developers.google.com/protocol-buffers/docs/reference/google.protobuf#google.protobuf.Timestamp

A Timestamp represents a point in time independent of any time zone or calendar, represented as seconds and fractions of seconds at nanosecond resolution in UTC Epoch time

I.e. it's an instant.
We don't have an instant type (#2273), so we represent point-in-time data as "timestamp with time zone".

Would you consider changing the mapping to reveal the point-in-time semantics of the data?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, the type is nano-second precision. Why do we map it to microsecond precision?

cc @mx123

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are direct ports from SEP, maybe @Praveen2112 remembers why?
Changing the semantic or precision might break back-compat, we might have to support both local/instant x all precisions combo?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing the semantic or precision might break back-compat,

Of course. Every change, including bug fixes, can break something.

we might have to support both local/instant x all precisions combo?

not sure why we would want that.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

6 participants