Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Schema Registry: Change Encoder to Serializer #27662

Merged
merged 11 commits into from
Mar 15, 2022
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@

### Breaking Changes

- Changed `SchemaRegistryApacheAvroEncoder` to `SchemaRegistryApacheAvroSerializer`.
- Changed `decodeMessageData` and `decodeMessageDataAsync` to `deserializeMessageData` and `deserializeMessageDataAsync`.
- Changed `encodeMessageData` and `encodeMessageDataAsync` to `serializeMessageData` and `serializeMessageDataAsync`.

### Bugs Fixed

### Other Changes
Expand All @@ -14,7 +18,7 @@

### Features Added

- Changed `SchemaRegistryApacheAvroEncoder` to deserialize `MessageWithMetadata` rather than tied to a binary format
- Changed `SchemaRegistryApacheAvroEncoder` to deserialize `MessageWithMetadata` rather than tied to a binary format
with preamble. Backwards compatibility with preamble format supported for this release. See issue #26449.

### Breaking Changes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,11 @@ SchemaRegistryAsyncClient schemaRegistryAsyncClient = new SchemaRegistryClientBu

#### Create `SchemaRegistryAvroSerializer` through the builder

```java readme-sample-createSchemaRegistryAvroEncoder
SchemaRegistryApacheAvroEncoder encoder = new SchemaRegistryApacheAvroEncoderBuilder()
```java readme-sample-createSchemaRegistryAvroSerializer
SchemaRegistryApacheAvroSerializer serializer = new SchemaRegistryApacheAvroSerializerBuilder()
.schemaRegistryAsyncClient(schemaRegistryAsyncClient)
.schemaGroup("{schema-group}")
.buildEncoder();
.buildSerializer();
```

## Key concepts
Expand Down Expand Up @@ -105,13 +105,13 @@ The serializer in this library creates messages in a wire format. The format is
### Serialize
Serialize a strongly-typed object into Schema Registry-compatible avro payload.

```java readme-sample-encodeSample
```java readme-sample-serializeSample
PlayingCard playingCard = new PlayingCard();
playingCard.setPlayingCardSuit(PlayingCardSuit.SPADES);
playingCard.setIsFaceCard(false);
playingCard.setCardValue(5);

MessageWithMetadata message = encoder.encodeMessageData(playingCard,
MessageWithMetadata message = serializer.serializeMessageData(playingCard,
TypeReference.createInstance(MessageWithMetadata.class));
```

Expand All @@ -121,10 +121,10 @@ The avro type `PlayingCard` is available in samples package
### Deserialize
Deserialize a Schema Registry-compatible avro payload into a strongly-type object.

```java readme-sample-decodeSample
SchemaRegistryApacheAvroEncoder encoder = createAvroSchemaRegistryEncoder();
```java readme-sample-deserializeSample
SchemaRegistryApacheAvroSerializer serializer = createAvroSchemaRegistrySerializer();
MessageWithMetadata message = getSchemaRegistryAvroMessage();
PlayingCard playingCard = encoder.decodeMessageData(message, TypeReference.createInstance(PlayingCard.class));
PlayingCard playingCard = serializer.deserializeMessageData(message, TypeReference.createInstance(PlayingCard.class));
```

## Troubleshooting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@
</properties>

<dependencies>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core</artifactId>
<version>1.26.0</version> <!-- {x-version-update;com.azure:azure-core;dependency} -->
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core-experimental</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@
/**
* Schema Registry-based serializer implementation for Avro data format using Apache Avro.
*/
public final class SchemaRegistryApacheAvroEncoder {
public final class SchemaRegistryApacheAvroSerializer {
static final String AVRO_MIME_TYPE = "avro/binary";
static final byte[] RECORD_FORMAT_INDICATOR = new byte[]{0x00, 0x00, 0x00, 0x00};
static final int RECORD_FORMAT_INDICATOR_SIZE = RECORD_FORMAT_INDICATOR.length;
static final int SCHEMA_ID_SIZE = 32;

private final ClientLogger logger = new ClientLogger(SchemaRegistryApacheAvroEncoder.class);
private final ClientLogger logger = new ClientLogger(SchemaRegistryApacheAvroSerializer.class);
private final SchemaRegistryAsyncClient schemaRegistryClient;
private final AvroSerializer avroSerializer;
private final SerializerOptions serializerOptions;
Expand All @@ -46,7 +46,7 @@ public final class SchemaRegistryApacheAvroEncoder {
* @param avroSerializer Serializer implemented using Apache Avro.
* @param serializerOptions Options to configure the serializer with.
*/
SchemaRegistryApacheAvroEncoder(SchemaRegistryAsyncClient schemaRegistryClient,
SchemaRegistryApacheAvroSerializer(SchemaRegistryAsyncClient schemaRegistryClient,
AvroSerializer avroSerializer, SerializerOptions serializerOptions) {
this.schemaRegistryClient = Objects.requireNonNull(schemaRegistryClient,
"'schemaRegistryClient' cannot be null.");
Expand All @@ -70,8 +70,8 @@ public final class SchemaRegistryApacheAvroEncoder {
* encoding the object.
* @throws NullPointerException if the {@code object} is null or {@code typeReference} is null.
*/
public <T extends MessageWithMetadata> T encodeMessageData(Object object, TypeReference<T> typeReference) {
return encodeMessageDataAsync(object, typeReference).block();
public <T extends MessageWithMetadata> T serializeMessageData(Object object, TypeReference<T> typeReference) {
return serializeMessageDataAsync(object, typeReference).block();
}

/**
Expand All @@ -90,9 +90,9 @@ public <T extends MessageWithMetadata> T encodeMessageData(Object object, TypeRe
* encoding the object.
* @throws NullPointerException if the {@code object} is null or {@code typeReference} is null.
*/
public <T extends MessageWithMetadata> T encodeMessageData(Object object, TypeReference<T> typeReference,
public <T extends MessageWithMetadata> T serializeMessageData(Object object, TypeReference<T> typeReference,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update the javadoc too to change from "Encodes an object into a message." to "Serializes an object into a message.". Update other JavaDocs of other APIs too.

Function<BinaryData, T> messageFactory) {
return encodeMessageDataAsync(object, typeReference, messageFactory).block();
return serializeMessageDataAsync(object, typeReference, messageFactory).block();
}

/**
Expand All @@ -110,10 +110,10 @@ public <T extends MessageWithMetadata> T encodeMessageData(Object object, TypeRe
* encoding the object.
* @throws NullPointerException if the {@code object} is null or {@code typeReference} is null.
*/
public <T extends MessageWithMetadata> Mono<T> encodeMessageDataAsync(Object object,
public <T extends MessageWithMetadata> Mono<T> serializeMessageDataAsync(Object object,
TypeReference<T> typeReference) {

return encodeMessageDataAsync(object, typeReference, null);
return serializeMessageDataAsync(object, typeReference, null);
}

/**
Expand All @@ -133,7 +133,7 @@ public <T extends MessageWithMetadata> Mono<T> encodeMessageDataAsync(Object obj
* encoding the object.
* @throws NullPointerException if the {@code object} is null or {@code typeReference} is null.
*/
public <T extends MessageWithMetadata> Mono<T> encodeMessageDataAsync(Object object,
public <T extends MessageWithMetadata> Mono<T> serializeMessageDataAsync(Object object,
TypeReference<T> typeReference, Function<BinaryData, T> messageFactory) {

if (object == null) {
Expand Down Expand Up @@ -199,8 +199,8 @@ public <T extends MessageWithMetadata> Mono<T> encodeMessageDataAsync(Object obj
*
* @throws NullPointerException if {@code message} or {@code typeReference} is null.
*/
public <T> T decodeMessageData(MessageWithMetadata message, TypeReference<T> typeReference) {
return decodeMessageDataAsync(message, typeReference).block();
public <T> T deserializeMessageData(MessageWithMetadata message, TypeReference<T> typeReference) {
return deserializeMessageDataAsync(message, typeReference).block();
}

/**
Expand All @@ -215,7 +215,7 @@ public <T> T decodeMessageData(MessageWithMetadata message, TypeReference<T> typ
*
* @throws NullPointerException if {@code message} or {@code typeReference} is null.
*/
public <T> Mono<T> decodeMessageDataAsync(MessageWithMetadata message, TypeReference<T> typeReference) {
public <T> Mono<T> deserializeMessageDataAsync(MessageWithMetadata message, TypeReference<T> typeReference) {
if (message == null) {
return monoError(logger, new NullPointerException("'message' cannot be null."));
} else if (typeReference == null) {
Expand Down Expand Up @@ -282,10 +282,10 @@ public <T> Mono<T> decodeMessageDataAsync(MessageWithMetadata message, TypeRefer
contents.reset();
}

return decodeMessageDataAsync(schemaId, contents, typeReference);
return deserializeMessageDataAsync(schemaId, contents, typeReference);
}

private <T> Mono<T> decodeMessageDataAsync(String schemaId, ByteBuffer buffer, TypeReference<T> typeReference) {
private <T> Mono<T> deserializeMessageDataAsync(String schemaId, ByteBuffer buffer, TypeReference<T> typeReference) {
return this.schemaRegistryClient.getSchema(schemaId)
.handle((registryObject, sink) -> {
final byte[] payloadSchema = registryObject.getDefinition().getBytes(StandardCharsets.UTF_8);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@
import java.util.Objects;

/**
* The builder implementation for building {@link SchemaRegistryApacheAvroEncoder}.
* The builder implementation for building {@link SchemaRegistryApacheAvroSerializer}.
*
* @see SchemaRegistryApacheAvroEncoder
* @see SchemaRegistryApacheAvroSerializer
*/
public final class SchemaRegistryApacheAvroEncoderBuilder {
public final class SchemaRegistryApacheAvroSerializerBuilder {
private static final boolean AVRO_SPECIFIC_READER_DEFAULT = false;
private static final int MAX_CACHE_SIZE = 128;

private final ClientLogger logger = new ClientLogger(SchemaRegistryApacheAvroEncoderBuilder.class);
private final ClientLogger logger = new ClientLogger(SchemaRegistryApacheAvroSerializerBuilder.class);
private Boolean autoRegisterSchemas;
private Boolean avroSpecificReader;
private SchemaRegistryAsyncClient schemaRegistryAsyncClient;
Expand All @@ -32,7 +32,7 @@ public final class SchemaRegistryApacheAvroEncoderBuilder {
/**
* Instantiates instance of Builder class. Supplies client defaults.
*/
public SchemaRegistryApacheAvroEncoderBuilder() {
public SchemaRegistryApacheAvroSerializerBuilder() {
this.autoRegisterSchemas = false;
this.avroSpecificReader = false;
}
Expand All @@ -46,9 +46,9 @@ public SchemaRegistryApacheAvroEncoderBuilder() {
*
* @param schemaGroup Azure Schema Registry schema group
*
* @return updated {@link SchemaRegistryApacheAvroEncoderBuilder} instance
* @return updated {@link SchemaRegistryApacheAvroSerializerBuilder} instance
*/
public SchemaRegistryApacheAvroEncoderBuilder schemaGroup(String schemaGroup) {
public SchemaRegistryApacheAvroSerializerBuilder schemaGroup(String schemaGroup) {
this.schemaGroup = schemaGroup;
return this;
}
Expand All @@ -64,9 +64,9 @@ public SchemaRegistryApacheAvroEncoderBuilder schemaGroup(String schemaGroup) {
*
* @param autoRegisterSchemas flag for schema auto-registration
*
* @return updated {@link SchemaRegistryApacheAvroEncoderBuilder} instance
* @return updated {@link SchemaRegistryApacheAvroSerializerBuilder} instance
*/
public SchemaRegistryApacheAvroEncoderBuilder autoRegisterSchema(boolean autoRegisterSchemas) {
public SchemaRegistryApacheAvroSerializerBuilder autoRegisterSchema(boolean autoRegisterSchemas) {
this.autoRegisterSchemas = autoRegisterSchemas;
return this;
}
Expand All @@ -78,9 +78,9 @@ public SchemaRegistryApacheAvroEncoderBuilder autoRegisterSchema(boolean autoReg
* @param avroSpecificReader {@code true} to deserialize into {@link SpecificRecord} via {@link
* SpecificDatumReader}; {@code false} otherwise.
*
* @return updated {@link SchemaRegistryApacheAvroEncoderBuilder} instance.
* @return updated {@link SchemaRegistryApacheAvroSerializerBuilder} instance.
*/
public SchemaRegistryApacheAvroEncoderBuilder avroSpecificReader(boolean avroSpecificReader) {
public SchemaRegistryApacheAvroSerializerBuilder avroSpecificReader(boolean avroSpecificReader) {
this.avroSpecificReader = avroSpecificReader;
return this;
}
Expand All @@ -90,9 +90,9 @@ public SchemaRegistryApacheAvroEncoderBuilder avroSpecificReader(boolean avroSpe
*
* @param schemaRegistryAsyncClient The {@link SchemaRegistryAsyncClient}.
*
* @return updated {@link SchemaRegistryApacheAvroEncoderBuilder} instance.
* @return updated {@link SchemaRegistryApacheAvroSerializerBuilder} instance.
*/
public SchemaRegistryApacheAvroEncoderBuilder schemaRegistryAsyncClient(
public SchemaRegistryApacheAvroSerializerBuilder schemaRegistryAsyncClient(
SchemaRegistryAsyncClient schemaRegistryAsyncClient) {
this.schemaRegistryAsyncClient = schemaRegistryAsyncClient;
return this;
Expand All @@ -101,13 +101,13 @@ public SchemaRegistryApacheAvroEncoderBuilder schemaRegistryAsyncClient(
/**
* Creates a new instance of Schema Registry serializer.
*
* @return A new instance of {@link SchemaRegistryApacheAvroEncoder}.
* @return A new instance of {@link SchemaRegistryApacheAvroSerializer}.
*
* @throws NullPointerException if {@link #schemaRegistryAsyncClient(SchemaRegistryAsyncClient)} is {@code null}
* @throws IllegalStateException if {@link #autoRegisterSchema(boolean)} is {@code true} but {@link
* #schemaGroup(String) schemaGroup} is {@code null}.
*/
public SchemaRegistryApacheAvroEncoder buildEncoder() {
public SchemaRegistryApacheAvroSerializer buildSerializer() {
final boolean isAutoRegister = autoRegisterSchemas != null && autoRegisterSchemas;

if (Objects.isNull(schemaRegistryAsyncClient)) {
Expand All @@ -126,6 +126,6 @@ public SchemaRegistryApacheAvroEncoder buildEncoder() {
EncoderFactory.get(), DecoderFactory.get());
final SerializerOptions options = new SerializerOptions(schemaGroup, isAutoRegister, MAX_CACHE_SIZE);

return new SchemaRegistryApacheAvroEncoder(schemaRegistryAsyncClient, codec, options);
return new SchemaRegistryApacheAvroSerializer(schemaRegistryAsyncClient, codec, options);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ This project welcomes contributions and suggestions. See [Contributing][sdk_read
[sdk_readme_troubleshooting]: https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/schemaregistry/azure-data-schemaregistry-apacheavro/README.md#troubleshooting
[sdk_readme_next_steps]: https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/schemaregistry/azure-data-schemaregistry-apacheavro/README.md#next-steps
[sdk_readme_contributing]: https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/schemaregistry/azure-data-schemaregistry-apacheavro/README.md#contributing
[sample_avro_serialization]: https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/schemaregistry/azure-data-schemaregistry-apacheavro/src/samples/java/com/azure/data/schemaregistry/apacheavro/SchemaRegistryApacheAvroEncoderSample.java
[sample_avro_serialization]: https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/schemaregistry/azure-data-schemaregistry-apacheavro/src/samples/java/com/azure/data/schemaregistry/apacheavro/SchemaRegistryApacheAvroSerializerSample.java
[sample_avro_deserialization]: https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/schemaregistry/azure-data-schemaregistry-apacheavro/src/samples/java/com/azure/data/schemaregistry/apacheavro/SchemaRegistryAvroDeserializationSample.java

![Impressions](https://azure-sdk-impressions.azurewebsites.net/api/impressions/azure-sdk-for-java%2Fsdk%schemaregistry%2Fazure-data-schemaregistry-apacheavro%2Fsrc%2Fsamples%2README.png)
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
public class ReadmeSamples {

/**
* Sample to demonstrate creation of {@link SchemaRegistryApacheAvroEncoder}.
* @return The {@link SchemaRegistryApacheAvroEncoder}.
* Sample to demonstrate creation of {@link SchemaRegistryApacheAvroSerializer}.
* @return The {@link SchemaRegistryApacheAvroSerializer}.
*/
public SchemaRegistryApacheAvroEncoder createAvroSchemaRegistryEncoder() {
public SchemaRegistryApacheAvroSerializer createAvroSchemaRegistrySerializer() {
// BEGIN: readme-sample-createSchemaRegistryAsyncClient
TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();

Expand All @@ -34,42 +34,42 @@ public SchemaRegistryApacheAvroEncoder createAvroSchemaRegistryEncoder() {
.buildAsyncClient();
// END: readme-sample-createSchemaRegistryAsyncClient

// BEGIN: readme-sample-createSchemaRegistryAvroEncoder
SchemaRegistryApacheAvroEncoder encoder = new SchemaRegistryApacheAvroEncoderBuilder()
// BEGIN: readme-sample-createSchemaRegistryAvroSerializer
SchemaRegistryApacheAvroSerializer serializer = new SchemaRegistryApacheAvroSerializerBuilder()
.schemaRegistryAsyncClient(schemaRegistryAsyncClient)
.schemaGroup("{schema-group}")
.buildEncoder();
// END: readme-sample-createSchemaRegistryAvroEncoder
.buildSerializer();
// END: readme-sample-createSchemaRegistryAvroSerializer

return encoder;
return serializer;
}

/**
* Encode a strongly-typed object into avro payload compatible with schema registry.
*/
public void encodeSample() {
SchemaRegistryApacheAvroEncoder encoder = createAvroSchemaRegistryEncoder();
public void serializeSample() {
SchemaRegistryApacheAvroSerializer serializer = createAvroSchemaRegistrySerializer();

// BEGIN: readme-sample-encodeSample
// BEGIN: readme-sample-serializeSample
PlayingCard playingCard = new PlayingCard();
playingCard.setPlayingCardSuit(PlayingCardSuit.SPADES);
playingCard.setIsFaceCard(false);
playingCard.setCardValue(5);

MessageWithMetadata message = encoder.encodeMessageData(playingCard,
MessageWithMetadata message = serializer.serializeMessageData(playingCard,
TypeReference.createInstance(MessageWithMetadata.class));
// END: readme-sample-encodeSample
// END: readme-sample-serializeSample
}

/**
* Decode avro payload compatible with schema registry into a strongly-type object.
*/
public void decodeSample() {
// BEGIN: readme-sample-decodeSample
SchemaRegistryApacheAvroEncoder encoder = createAvroSchemaRegistryEncoder();
public void deserializeSample() {
// BEGIN: readme-sample-deserializeSample
SchemaRegistryApacheAvroSerializer serializer = createAvroSchemaRegistrySerializer();
MessageWithMetadata message = getSchemaRegistryAvroMessage();
PlayingCard playingCard = encoder.decodeMessageData(message, TypeReference.createInstance(PlayingCard.class));
// END: readme-sample-decodeSample
PlayingCard playingCard = serializer.deserializeMessageData(message, TypeReference.createInstance(PlayingCard.class));
// END: readme-sample-deserializeSample
}

/**
Expand Down
Loading