Skip to content

Commit

Permalink
[improve][io] The JDBC connector supports JSON substructure schema (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
shibd authored Jul 21, 2024
1 parent d7e8ea1 commit d08e2e0
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 8 deletions.
13 changes: 6 additions & 7 deletions pulsar-io/jdbc/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-client-original</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
Expand Down Expand Up @@ -71,13 +77,6 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-client-original</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.KeyValueSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.functions.api.Record;
Expand Down Expand Up @@ -173,7 +174,7 @@ private static void setColumnNull(PreparedStatement statement, int index, int ty

}

private static void setColumnValue(PreparedStatement statement, int index, Object value) throws Exception {
protected void setColumnValue(PreparedStatement statement, int index, Object value) throws Exception {

log.debug("Setting column value, statement: {}, index: {}, value: {}", statement, index, value);

Expand All @@ -193,6 +194,8 @@ private static void setColumnValue(PreparedStatement statement, int index, Objec
statement.setShort(index, (Short) value);
} else if (value instanceof ByteString) {
statement.setBytes(index, ((ByteString) value).toByteArray());
} else if (value instanceof GenericJsonRecord) {
statement.setString(index, ((GenericJsonRecord) value).getJsonNode().toString());
} else {
throw new Exception("Not supported value type, need to add it. " + value.getClass());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,23 @@
*/
package org.apache.pulsar.io.jdbc;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import java.lang.reflect.Field;
import java.sql.PreparedStatement;
import java.util.Arrays;
import java.util.List;
import java.util.function.Function;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.util.Utf8;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericJsonSchema;
import org.apache.pulsar.functions.api.Record;
import org.testng.Assert;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -169,4 +179,62 @@ public GenericRecord getValue() {
}


@Test
@SuppressWarnings("unchecked")
public void testSubFieldJsonArray() throws Exception {
BaseJdbcAutoSchemaSink baseJdbcAutoSchemaSink = new BaseJdbcAutoSchemaSink() {};

Field field = JdbcAbstractSink.class.getDeclaredField("jdbcSinkConfig");
field.setAccessible(true);
JdbcSinkConfig jdbcSinkConfig = new JdbcSinkConfig();
jdbcSinkConfig.setNullValueAction(JdbcSinkConfig.NullValueAction.FAIL);
field.set(baseJdbcAutoSchemaSink, jdbcSinkConfig);

TStates tStates = new TStates("tstats", Arrays.asList(
new PC("brand1", "model1"),
new PC("brand2", "model2")
));
org.apache.pulsar.client.api.Schema<TStates> jsonSchema = org.apache.pulsar.client.api.Schema.JSON(TStates.class);
GenericJsonSchema genericJsonSchema = new GenericJsonSchema(jsonSchema.getSchemaInfo());
byte[] encode = jsonSchema.encode(tStates);
GenericRecord genericRecord = genericJsonSchema.decode(encode);

AutoConsumeSchema autoConsumeSchema = new AutoConsumeSchema();
autoConsumeSchema.setSchema(org.apache.pulsar.client.api.Schema.JSON(TStates.class));
Record<? extends GenericObject> record = new Record<GenericRecord>() {
@Override
public org.apache.pulsar.client.api.Schema<GenericRecord> getSchema() {
return genericJsonSchema;
}

@Override
public GenericRecord getValue() {
return genericRecord;
}
};
JdbcAbstractSink.Mutation mutation = baseJdbcAutoSchemaSink.createMutation((Record<GenericObject>) record);
PreparedStatement mockPreparedStatement = mock(PreparedStatement.class);
baseJdbcAutoSchemaSink.setColumnValue(mockPreparedStatement, 0, mutation.getValues().apply("state"));
baseJdbcAutoSchemaSink.setColumnValue(mockPreparedStatement, 1, mutation.getValues().apply("pcList"));
verify(mockPreparedStatement).setString(0, "tstats");
verify(mockPreparedStatement).setString(1, "[{\"brand\":\"brand1\",\"model\":\"model1\"},{\"brand\":\"brand2\",\"model\":\"model2\"}]");
}

@Data
@AllArgsConstructor
@NoArgsConstructor
private static class TStates {
public String state;
public List<PC> pcList;
}

@Data
@AllArgsConstructor
@NoArgsConstructor
private static class PC {
public String brand;
public String model;
}


}

0 comments on commit d08e2e0

Please sign in to comment.