Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Schema Registry: Change Encoder to Serializer #27662

Merged
merged 11 commits into from
Mar 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@

### Breaking Changes

- Changed `SchemaRegistryApacheAvroEncoder` to `SchemaRegistryApacheAvroSerializer`.
- Changed `decodeMessageData` and `decodeMessageDataAsync` to `deserializeMessageData` and `deserializeMessageDataAsync`.
- Changed `encodeMessageData` and `encodeMessageDataAsync` to `serializeMessageData` and `serializeMessageDataAsync`.

### Bugs Fixed

### Other Changes
Expand All @@ -14,7 +18,7 @@

### Features Added

- Changed `SchemaRegistryApacheAvroEncoder` to deserialize `MessageWithMetadata` rather than tied to a binary format
- Changed `SchemaRegistryApacheAvroEncoder` to deserialize `MessageWithMetadata` rather than tied to a binary format
with preamble. Backwards compatibility with preamble format supported for this release. See issue #26449.

### Breaking Changes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,11 @@ SchemaRegistryAsyncClient schemaRegistryAsyncClient = new SchemaRegistryClientBu

#### Create `SchemaRegistryAvroSerializer` through the builder

```java readme-sample-createSchemaRegistryAvroEncoder
SchemaRegistryApacheAvroEncoder encoder = new SchemaRegistryApacheAvroEncoderBuilder()
```java readme-sample-createSchemaRegistryAvroSerializer
SchemaRegistryApacheAvroSerializer serializer = new SchemaRegistryApacheAvroSerializerBuilder()
.schemaRegistryAsyncClient(schemaRegistryAsyncClient)
.schemaGroup("{schema-group}")
.buildEncoder();
.buildSerializer();
```

## Key concepts
Expand Down Expand Up @@ -105,13 +105,13 @@ The serializer in this library creates messages in a wire format. The format is
### Serialize
Serialize a strongly-typed object into Schema Registry-compatible avro payload.

```java readme-sample-encodeSample
```java readme-sample-serializeSample
PlayingCard playingCard = new PlayingCard();
playingCard.setPlayingCardSuit(PlayingCardSuit.SPADES);
playingCard.setIsFaceCard(false);
playingCard.setCardValue(5);

MessageWithMetadata message = encoder.encodeMessageData(playingCard,
MessageWithMetadata message = serializer.serializeMessageData(playingCard,
TypeReference.createInstance(MessageWithMetadata.class));
```

Expand All @@ -121,10 +121,10 @@ The avro type `PlayingCard` is available in samples package
### Deserialize
Deserialize a Schema Registry-compatible avro payload into a strongly-type object.

```java readme-sample-decodeSample
SchemaRegistryApacheAvroEncoder encoder = createAvroSchemaRegistryEncoder();
```java readme-sample-deserializeSample
SchemaRegistryApacheAvroSerializer serializer = createAvroSchemaRegistrySerializer();
MessageWithMetadata message = getSchemaRegistryAvroMessage();
PlayingCard playingCard = encoder.decodeMessageData(message, TypeReference.createInstance(PlayingCard.class));
PlayingCard playingCard = serializer.deserializeMessageData(message, TypeReference.createInstance(PlayingCard.class));
```

## Troubleshooting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@
</properties>

<dependencies>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core</artifactId>
<version>1.26.0</version> <!-- {x-version-update;com.azure:azure-core;dependency} -->
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core-experimental</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@
/**
* Schema Registry-based serializer implementation for Avro data format using Apache Avro.
*/
public final class SchemaRegistryApacheAvroEncoder {
public final class SchemaRegistryApacheAvroSerializer {
static final String AVRO_MIME_TYPE = "avro/binary";
static final byte[] RECORD_FORMAT_INDICATOR = new byte[]{0x00, 0x00, 0x00, 0x00};
static final int RECORD_FORMAT_INDICATOR_SIZE = RECORD_FORMAT_INDICATOR.length;
static final int SCHEMA_ID_SIZE = 32;

private final ClientLogger logger = new ClientLogger(SchemaRegistryApacheAvroEncoder.class);
private final ClientLogger logger = new ClientLogger(SchemaRegistryApacheAvroSerializer.class);
private final SchemaRegistryAsyncClient schemaRegistryClient;
private final AvroSerializer avroSerializer;
private final SerializerOptions serializerOptions;
Expand All @@ -46,7 +46,7 @@ public final class SchemaRegistryApacheAvroEncoder {
* @param avroSerializer Serializer implemented using Apache Avro.
* @param serializerOptions Options to configure the serializer with.
*/
SchemaRegistryApacheAvroEncoder(SchemaRegistryAsyncClient schemaRegistryClient,
SchemaRegistryApacheAvroSerializer(SchemaRegistryAsyncClient schemaRegistryClient,
AvroSerializer avroSerializer, SerializerOptions serializerOptions) {
this.schemaRegistryClient = Objects.requireNonNull(schemaRegistryClient,
"'schemaRegistryClient' cannot be null.");
Expand All @@ -56,84 +56,84 @@ public final class SchemaRegistryApacheAvroEncoder {
}

/**
* Encodes an object into a message.
* Serializes an object into a message.
*
* @param object Object to encode.
* @param object Object to serialize.
* @param typeReference Type of message to create.
* @param <T> Concrete type of {@link MessageWithMetadata}.
*
* @return The message encoded or {@code null} if the message could not be encoded.
* @return The message encoded or {@code null} if the message could not be serialized.
*
* @throws IllegalArgumentException if {@code messageFactory} is null and type {@code T} does not have a no
* argument constructor. Or if the schema could not ve fetched from {@code T}.
* argument constructor. Or if the schema could not be fetched from {@code T}.
* @throws RuntimeException if an instance of {@code T} could not be instantiated. Or there was a problem
* encoding the object.
* @throws NullPointerException if the {@code object} is null or {@code typeReference} is null.
*/
public <T extends MessageWithMetadata> T encodeMessageData(Object object, TypeReference<T> typeReference) {
return encodeMessageDataAsync(object, typeReference).block();
public <T extends MessageWithMetadata> T serializeMessageData(Object object, TypeReference<T> typeReference) {
return serializeMessageDataAsync(object, typeReference).block();
}

/**
* Encodes an object into a message.
* Serializes an object into a message.
*
* @param object Object to encode.
* @param object Object to serialize.
* @param typeReference Type of message to create.
* @param messageFactory Factory to create an instance given the serialized Avro.
* @param <T> Concrete type of {@link MessageWithMetadata}.
*
* @return The message encoded or {@code null} if the message could not be encoded.
* @return The message encoded or {@code null} if the message could not be serialized.
*
* @throws IllegalArgumentException if {@code messageFactory} is null and type {@code T} does not have a no
* argument constructor. Or if the schema could not ve fetched from {@code T}.
* argument constructor. Or if the schema could not be fetched from {@code T}.
* @throws RuntimeException if an instance of {@code T} could not be instantiated. Or there was a problem
* encoding the object.
* @throws NullPointerException if the {@code object} is null or {@code typeReference} is null.
*/
public <T extends MessageWithMetadata> T encodeMessageData(Object object, TypeReference<T> typeReference,
public <T extends MessageWithMetadata> T serializeMessageData(Object object, TypeReference<T> typeReference,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update the javadoc too to change from "Encodes an object into a message." to "Serializes an object into a message.". Update other JavaDocs of other APIs too.

Function<BinaryData, T> messageFactory) {
return encodeMessageDataAsync(object, typeReference, messageFactory).block();
return serializeMessageDataAsync(object, typeReference, messageFactory).block();
}

/**
* Encodes an object into a message.
* Serializes an object into a message.
*
* @param object Object to encode.
* @param object Object to serialize.
* @param typeReference Type of message to create.
* @param <T> Concrete type of {@link MessageWithMetadata}.
*
* @return A Mono that completes with the encoded message.
* @return A Mono that completes with the serialized message.
*
* @throws IllegalArgumentException if {@code messageFactory} is null and type {@code T} does not have a no
* argument constructor. Or if the schema could not ve fetched from {@code T}.
* argument constructor. Or if the schema could not be fetched from {@code T}.
* @throws RuntimeException if an instance of {@code T} could not be instantiated. Or there was a problem
* encoding the object.
* @throws NullPointerException if the {@code object} is null or {@code typeReference} is null.
*/
public <T extends MessageWithMetadata> Mono<T> encodeMessageDataAsync(Object object,
public <T extends MessageWithMetadata> Mono<T> serializeMessageDataAsync(Object object,
TypeReference<T> typeReference) {

return encodeMessageDataAsync(object, typeReference, null);
return serializeMessageDataAsync(object, typeReference, null);
}

/**
* Encodes an object into a message.
* Serializes an object into a message.
*
* @param object Object to encode.
* @param object Object to serialize.
* @param typeReference Type of message to create.
* @param messageFactory Factory to create an instance given the serialized Avro. If null is passed in, then the
* no argument constructor will be used.
* @param <T> Concrete type of {@link MessageWithMetadata}.
*
* @return A Mono that completes with the encoded message.
* @return A Mono that completes with the serialized message.
*
* @throws IllegalArgumentException if {@code messageFactory} is null and type {@code T} does not have a no
* argument constructor. Or if the schema could not ve fetched from {@code T}.
* argument constructor. Or if the schema could not be fetched from {@code T}.
* @throws RuntimeException if an instance of {@code T} could not be instantiated. Or there was a problem
* encoding the object.
* @throws NullPointerException if the {@code object} is null or {@code typeReference} is null.
*/
public <T extends MessageWithMetadata> Mono<T> encodeMessageDataAsync(Object object,
public <T extends MessageWithMetadata> Mono<T> serializeMessageDataAsync(Object object,
TypeReference<T> typeReference, Function<BinaryData, T> messageFactory) {

if (object == null) {
Expand Down Expand Up @@ -189,33 +189,33 @@ public <T extends MessageWithMetadata> Mono<T> encodeMessageDataAsync(Object obj
}

/**
* Decodes a message into its object.
* Deserializes a message into its object.
*
* @param message Object to encode.
* @param typeReference Message to encode to.
* @param message Object to deserialize.
* @param typeReference Message type to deserialize to.
* @param <T> Concrete type of {@link MessageWithMetadata}.
*
* @return The message encoded.
* @return The message deserialized.
*
* @throws NullPointerException if {@code message} or {@code typeReference} is null.
*/
public <T> T decodeMessageData(MessageWithMetadata message, TypeReference<T> typeReference) {
return decodeMessageDataAsync(message, typeReference).block();
public <T> T deserializeMessageData(MessageWithMetadata message, TypeReference<T> typeReference) {
return deserializeMessageDataAsync(message, typeReference).block();
}

/**
* Decodes a message into its object.
* Deserializes a message into its object.
*
* @param message Object to encode.
* @param typeReference Message to encode to.
* @param message Object to deserialize.
* @param typeReference Message to deserialize to.
* @param <T> Concrete type of {@link MessageWithMetadata}.
*
* @return A Mono that completes when the message encoded. If {@code message.getBodyAsBinaryData()} is null or
* empty, then an empty Mono is returned.
*
* @throws NullPointerException if {@code message} or {@code typeReference} is null.
*/
public <T> Mono<T> decodeMessageDataAsync(MessageWithMetadata message, TypeReference<T> typeReference) {
public <T> Mono<T> deserializeMessageDataAsync(MessageWithMetadata message, TypeReference<T> typeReference) {
if (message == null) {
return monoError(logger, new NullPointerException("'message' cannot be null."));
} else if (typeReference == null) {
Expand Down Expand Up @@ -282,10 +282,10 @@ public <T> Mono<T> decodeMessageDataAsync(MessageWithMetadata message, TypeRefer
contents.reset();
}

return decodeMessageDataAsync(schemaId, contents, typeReference);
return deserializeMessageDataAsync(schemaId, contents, typeReference);
}

private <T> Mono<T> decodeMessageDataAsync(String schemaId, ByteBuffer buffer, TypeReference<T> typeReference) {
private <T> Mono<T> deserializeMessageDataAsync(String schemaId, ByteBuffer buffer, TypeReference<T> typeReference) {
return this.schemaRegistryClient.getSchema(schemaId)
.handle((registryObject, sink) -> {
final byte[] payloadSchema = registryObject.getDefinition().getBytes(StandardCharsets.UTF_8);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@
import java.util.Objects;

/**
* The builder implementation for building {@link SchemaRegistryApacheAvroEncoder}.
* The builder implementation for building {@link SchemaRegistryApacheAvroSerializer}.
*
* @see SchemaRegistryApacheAvroEncoder
* @see SchemaRegistryApacheAvroSerializer
*/
public final class SchemaRegistryApacheAvroEncoderBuilder {
public final class SchemaRegistryApacheAvroSerializerBuilder {
private static final boolean AVRO_SPECIFIC_READER_DEFAULT = false;
private static final int MAX_CACHE_SIZE = 128;

private final ClientLogger logger = new ClientLogger(SchemaRegistryApacheAvroEncoderBuilder.class);
private final ClientLogger logger = new ClientLogger(SchemaRegistryApacheAvroSerializerBuilder.class);
private Boolean autoRegisterSchemas;
private Boolean avroSpecificReader;
private SchemaRegistryAsyncClient schemaRegistryAsyncClient;
Expand All @@ -32,7 +32,7 @@ public final class SchemaRegistryApacheAvroEncoderBuilder {
/**
* Instantiates instance of Builder class. Supplies client defaults.
*/
public SchemaRegistryApacheAvroEncoderBuilder() {
public SchemaRegistryApacheAvroSerializerBuilder() {
this.autoRegisterSchemas = false;
this.avroSpecificReader = false;
}
Expand All @@ -46,9 +46,9 @@ public SchemaRegistryApacheAvroEncoderBuilder() {
*
* @param schemaGroup Azure Schema Registry schema group
*
* @return updated {@link SchemaRegistryApacheAvroEncoderBuilder} instance
* @return updated {@link SchemaRegistryApacheAvroSerializerBuilder} instance
*/
public SchemaRegistryApacheAvroEncoderBuilder schemaGroup(String schemaGroup) {
public SchemaRegistryApacheAvroSerializerBuilder schemaGroup(String schemaGroup) {
this.schemaGroup = schemaGroup;
return this;
}
Expand All @@ -64,9 +64,9 @@ public SchemaRegistryApacheAvroEncoderBuilder schemaGroup(String schemaGroup) {
*
* @param autoRegisterSchemas flag for schema auto-registration
*
* @return updated {@link SchemaRegistryApacheAvroEncoderBuilder} instance
* @return updated {@link SchemaRegistryApacheAvroSerializerBuilder} instance
*/
public SchemaRegistryApacheAvroEncoderBuilder autoRegisterSchema(boolean autoRegisterSchemas) {
public SchemaRegistryApacheAvroSerializerBuilder autoRegisterSchema(boolean autoRegisterSchemas) {
this.autoRegisterSchemas = autoRegisterSchemas;
return this;
}
Expand All @@ -78,9 +78,9 @@ public SchemaRegistryApacheAvroEncoderBuilder autoRegisterSchema(boolean autoReg
* @param avroSpecificReader {@code true} to deserialize into {@link SpecificRecord} via {@link
* SpecificDatumReader}; {@code false} otherwise.
*
* @return updated {@link SchemaRegistryApacheAvroEncoderBuilder} instance.
* @return updated {@link SchemaRegistryApacheAvroSerializerBuilder} instance.
*/
public SchemaRegistryApacheAvroEncoderBuilder avroSpecificReader(boolean avroSpecificReader) {
public SchemaRegistryApacheAvroSerializerBuilder avroSpecificReader(boolean avroSpecificReader) {
this.avroSpecificReader = avroSpecificReader;
return this;
}
Expand All @@ -90,9 +90,9 @@ public SchemaRegistryApacheAvroEncoderBuilder avroSpecificReader(boolean avroSpe
*
* @param schemaRegistryAsyncClient The {@link SchemaRegistryAsyncClient}.
*
* @return updated {@link SchemaRegistryApacheAvroEncoderBuilder} instance.
* @return updated {@link SchemaRegistryApacheAvroSerializerBuilder} instance.
*/
public SchemaRegistryApacheAvroEncoderBuilder schemaRegistryAsyncClient(
public SchemaRegistryApacheAvroSerializerBuilder schemaRegistryAsyncClient(
SchemaRegistryAsyncClient schemaRegistryAsyncClient) {
this.schemaRegistryAsyncClient = schemaRegistryAsyncClient;
return this;
Expand All @@ -101,13 +101,13 @@ public SchemaRegistryApacheAvroEncoderBuilder schemaRegistryAsyncClient(
/**
* Creates a new instance of Schema Registry serializer.
*
* @return A new instance of {@link SchemaRegistryApacheAvroEncoder}.
* @return A new instance of {@link SchemaRegistryApacheAvroSerializer}.
*
* @throws NullPointerException if {@link #schemaRegistryAsyncClient(SchemaRegistryAsyncClient)} is {@code null}
* @throws IllegalStateException if {@link #autoRegisterSchema(boolean)} is {@code true} but {@link
* #schemaGroup(String) schemaGroup} is {@code null}.
*/
public SchemaRegistryApacheAvroEncoder buildEncoder() {
public SchemaRegistryApacheAvroSerializer buildSerializer() {
final boolean isAutoRegister = autoRegisterSchemas != null && autoRegisterSchemas;

if (Objects.isNull(schemaRegistryAsyncClient)) {
Expand All @@ -126,6 +126,6 @@ public SchemaRegistryApacheAvroEncoder buildEncoder() {
EncoderFactory.get(), DecoderFactory.get());
final SerializerOptions options = new SerializerOptions(schemaGroup, isAutoRegister, MAX_CACHE_SIZE);

return new SchemaRegistryApacheAvroEncoder(schemaRegistryAsyncClient, codec, options);
return new SchemaRegistryApacheAvroSerializer(schemaRegistryAsyncClient, codec, options);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ This project welcomes contributions and suggestions. See [Contributing][sdk_read
[sdk_readme_troubleshooting]: https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/schemaregistry/azure-data-schemaregistry-apacheavro/README.md#troubleshooting
[sdk_readme_next_steps]: https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/schemaregistry/azure-data-schemaregistry-apacheavro/README.md#next-steps
[sdk_readme_contributing]: https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/schemaregistry/azure-data-schemaregistry-apacheavro/README.md#contributing
[sample_avro_serialization]: https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/schemaregistry/azure-data-schemaregistry-apacheavro/src/samples/java/com/azure/data/schemaregistry/apacheavro/SchemaRegistryApacheAvroEncoderSample.java
[sample_avro_serialization]: https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/schemaregistry/azure-data-schemaregistry-apacheavro/src/samples/java/com/azure/data/schemaregistry/apacheavro/SchemaRegistryApacheAvroSerializerSample.java
[sample_avro_deserialization]: https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/schemaregistry/azure-data-schemaregistry-apacheavro/src/samples/java/com/azure/data/schemaregistry/apacheavro/SchemaRegistryAvroDeserializationSample.java

![Impressions](https://azure-sdk-impressions.azurewebsites.net/api/impressions/azure-sdk-for-java%2Fsdk%schemaregistry%2Fazure-data-schemaregistry-apacheavro%2Fsrc%2Fsamples%2README.png)
Loading