-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support avro types for nullable fields generated by Connect. #592
Support avro types for nullable fields generated by Connect. #592
Conversation
return getKSQLSchemaForAvroSchema(schemaList.get(0)); | ||
} | ||
} | ||
throw new KsqlException(String.format("Cannot find correct type for avro type: %s", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case it would help to have a more informative error message about the restrictions we place on unions:
(at most 2 types, one must be null)
assertThat("Incorrect field schema.", schema.fields().get(7).schema(), equalTo(Schema | ||
.STRING_SCHEMA)); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add some negative cases for union validation?
I've tested this against two Kafka Connect-sourced topics.
My guess is because the Avro schema has a {
"type": "record",
"name": "LOGON",
"namespace": "ORCL.SOE2",
"fields": [
{
"name": "table",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "op_type",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "op_ts",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "current_ts",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "pos",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "LOGON_ID",
"type": [
"null",
"double"
],
"default": null
},
{
"name": "CUSTOMER_ID",
"type": [
"null",
"double"
],
"default": null
},
{
"name": "LOGON_DATE",
"type": [
"null",
"string"
],
"default": null
}
],
"connect.name": "ORCL.SOE2.LOGON"
} |
Fortunately GoldenGate can be configured to drop the ksql> CREATE STREAM LOGON WITH (KAFKA_TOPIC='ora-ogg2-SOE2-LOGON-avro', VALUE_FORMAT='AVRO');
Message
----------------
Stream created
----------------
ksql> select * from logon;
1515151793394 | @178724_31809_2000-11-08 23:08:51 | I | 2017-09-13 14:51:36.000000 | 2018-01-05 06:29:53.221000 | 00000000010105263863 | 178724.0 | 31809.0 | 2000-11-08 23:08:51
1515151793394 | @178725_91808_2009-06-29 02:38:11 | I | 2017-09-13 14:51:36.000000 | 2018-01-05 06:29:53.394000 | 00000000010105264035 | 178725.0 | 91808.0 | 2009-06-29 02:38:11
1515151793395 | @178726_78742_2007-11-06 15:29:38 | I | 2017-09-13 14:51:36.000000 | 2018-01-05 06:29:53.394001 | 00000000010105264167 | 178726.0 | 78742.0 | 2007-11-06 15:29:38 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You probably want to add a test that fails if the union type isn't as expected, i.e, there is no null, doesn't have exactly 2 fields etc etc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Schema schema = SerDeUtil.getSchemaFromAvro(avroSchemaStr); | ||
fail(); | ||
} catch (KsqlException ksqlException) { | ||
assertThat("", ksqlException.getMessage(), equalTo("Union type cannot have more than two " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you aren't going to provide a message in the assertThat(..)
you can omit the first string param, i.e., just use assertThat(ksqlException.getMessage(), equalTo("...."))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
…ntinc#592) * Support avro types for nullable fields generated by Connect. * Applied review feedback. * Applied review feedback.
With this PR KSQL will be able to handle union field types that were generated by connect for nullable fields. For instance, here is an example of the type connect creates:
"name": "table",
"type": [
"null",
"string"
],
KSQL will use
string
as the type of this field.