Skip to content

Commit

Permalink
[HUDI-7138] Fix error table writer and schema registry provider (apac…
Browse files Browse the repository at this point in the history
…he#10173)


---------

Co-authored-by: rmahindra123 <[email protected]>
  • Loading branch information
rmahindra123 and rmahindra123 authored Nov 29, 2023
1 parent 91eabab commit 473cf9a
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ package org.apache.hudi
import org.apache.hudi.common.config.TypedProperties

import java.{util => ju}
import scala.collection.JavaConverters
import scala.jdk.CollectionConverters.dictionaryAsScalaMapConverter
import scala.collection.JavaConverters._

object HoodieConversionUtils {

Expand All @@ -49,9 +48,7 @@ object HoodieConversionUtils {
}

def fromProperties(props: TypedProperties): Map[String, String] = {
props.asScala.map {
case (k, v) => (k.toString, v.toString)
}.toMap
props.asScala.toMap
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ object HoodieSparkSqlWriter {
latestTableSchemaOpt: Option[Schema],
internalSchemaOpt: Option[InternalSchema],
props: TypedProperties): Schema = {
deduceWriterSchema(sourceSchema, latestTableSchemaOpt, internalSchemaOpt, props.toMap)
deduceWriterSchema(sourceSchema, latestTableSchemaOpt, internalSchemaOpt, HoodieConversionUtils.fromProperties(props))
}

def cleanup(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.hudi.utilities.sources.helpers.QueryInfo;
import org.apache.hudi.utilities.sources.helpers.QueryRunner;

import org.apache.parquet.Strings;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
Expand Down Expand Up @@ -141,7 +140,7 @@ public S3EventsHoodieIncrSource(

// This is to ensure backward compatibility where we were using the
// config SOURCE_FILE_FORMAT for file format in previous versions.
this.fileFormat = Strings.isNullOrEmpty(getStringWithAltKeys(props, DATAFILE_FORMAT, EMPTY_STRING))
this.fileFormat = StringUtils.isNullOrEmpty(getStringWithAltKeys(props, DATAFILE_FORMAT, EMPTY_STRING))
? getStringWithAltKeys(props, SOURCE_FILE_FORMAT, true)
: getStringWithAltKeys(props, DATAFILE_FORMAT, EMPTY_STRING);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.SparkSession;

import java.io.Serializable;

/**
* The class which handles error events while processing write records. All the
* records which have a processing/write failure are triggered as error events to
Expand All @@ -38,7 +40,7 @@
*
* The writer can use the configs defined in HoodieErrorTableConfig to manage the error table.
*/
public abstract class BaseErrorTableWriter<T extends ErrorEvent> {
public abstract class BaseErrorTableWriter<T extends ErrorEvent> implements Serializable {

// The column name passed to Spark for option `columnNameOfCorruptRecord`. The record
// is set to this column in case of an error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,18 @@ class TestSchemaRegistryProvider {
private static final String REGISTRY_RESPONSE = "{\"schema\":\"{\\\"type\\\": \\\"record\\\", \\\"namespace\\\": \\\"example\\\", "
+ "\\\"name\\\": \\\"FullName\\\",\\\"fields\\\": [{ \\\"name\\\": \\\"first\\\", \\\"type\\\": "
+ "\\\"string\\\" }]}\"}";
private static final String RAW_SCHEMA = "{\"type\": \"record\", \"namespace\": \"example\", "
+ "\"name\": \"FullName\",\"fields\": [{ \"name\": \"first\", \"type\": "
+ "\"string\" }]}";
private static final String CONVERTED_SCHEMA = "{\"type\": \"record\", \"namespace\": \"com.example.hoodie\", "
+ "\"name\": \"FullName\",\"fields\": [{ \"name\": \"first\", \"type\": "
+ "\"string\" }]}";

private static Schema getExpectedSchema() {
return new Schema.Parser().parse(RAW_SCHEMA);
}

private static Schema getExpectedConvertedSchema() {
return new Schema.Parser().parse(CONVERTED_SCHEMA);
}

Expand All @@ -60,7 +67,6 @@ private static TypedProperties getProps() {
put("hoodie.deltastreamer.schemaprovider.registry.baseUrl", "http://" + BASIC_AUTH + "@localhost");
put("hoodie.deltastreamer.schemaprovider.registry.urlSuffix", "-value");
put("hoodie.deltastreamer.schemaprovider.registry.url", "http://foo:bar@localhost");
put("hoodie.deltastreamer.schemaprovider.registry.schemaconverter", DummySchemaConverter.class.getName());
put("hoodie.deltastreamer.source.kafka.topic", "foo");
}
};
Expand Down Expand Up @@ -97,21 +103,23 @@ public void testGetTargetSchemaShouldRequestSchemaWithCreds() throws IOException
public void testGetSourceSchemaShouldRequestSchemaWithoutCreds() throws IOException {
TypedProperties props = getProps();
props.put("hoodie.deltastreamer.schemaprovider.registry.url", "http://localhost");
props.put("hoodie.deltastreamer.schemaprovider.registry.schemaconverter", DummySchemaConverter.class.getName());
SchemaRegistryProvider spyUnderTest = getUnderTest(props);
Schema actual = spyUnderTest.getSourceSchema();
assertNotNull(actual);
assertEquals(getExpectedSchema(), actual);
assertEquals(getExpectedConvertedSchema(), actual);
verify(spyUnderTest, times(0)).setAuthorizationHeader(Mockito.any(), Mockito.any());
}

@Test
public void testGetTargetSchemaShouldRequestSchemaWithoutCreds() throws IOException {
TypedProperties props = getProps();
props.put("hoodie.deltastreamer.schemaprovider.registry.url", "http://localhost");
props.put("hoodie.deltastreamer.schemaprovider.registry.schemaconverter", DummySchemaConverter.class.getName());
SchemaRegistryProvider spyUnderTest = getUnderTest(props);
Schema actual = spyUnderTest.getTargetSchema();
assertNotNull(actual);
assertEquals(getExpectedSchema(), actual);
assertEquals(getExpectedConvertedSchema(), actual);
verify(spyUnderTest, times(0)).setAuthorizationHeader(Mockito.any(), Mockito.any());
}

Expand Down

0 comments on commit 473cf9a

Please sign in to comment.