Skip to content

Commit

Permalink
feat: add a new default SchemaRegistryClient and remove default for S…
Browse files Browse the repository at this point in the history
…R url (#4325)
  • Loading branch information
stevenpyzhang authored Jan 16, 2020
1 parent 89b7fcf commit e045f7c
Show file tree
Hide file tree
Showing 11 changed files with 243 additions and 16 deletions.
3 changes: 3 additions & 0 deletions config/ksql-production-server.properties
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ ksql.logging.processing.stream.auto.create=true
# The set of Kafka brokers to bootstrap Kafka cluster information from:
bootstrap.servers=localhost:9092

# uncomment the below to point to a Schema Registry cluster
# ksql.schema.registry.url=http://localhost:8081

# uncomment the below to start an embedded Connect worker
# ksql.connect.worker.config=config/connect.properties
# ksql.connect.configs.topic=ksql-connect-configs
Expand Down
3 changes: 3 additions & 0 deletions config/ksql-server.properties
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ ksql.logging.processing.stream.auto.create=true
# The set of Kafka brokers to bootstrap Kafka cluster information from:
bootstrap.servers=localhost:9092

# uncomment the below to point to a Schema Registry cluster
# ksql.schema.registry.url=http://localhost:8081

# uncomment the below to start an embedded Connect worker
# ksql.connect.worker.config=config/connect.properties

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,9 +361,9 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
).define(
SCHEMA_REGISTRY_URL_PROPERTY,
ConfigDef.Type.STRING,
DEFAULT_SCHEMA_REGISTRY_URL,
"",
ConfigDef.Importance.MEDIUM,
"The URL for the schema registry, defaults to http://localhost:8081"
"The URL for the schema registry"
).define(
CONNECT_URL_PROPERTY,
ConfigDef.Type.STRING,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.schema.registry;

import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference;

import io.confluent.ksql.rest.ErrorMessages;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.common.config.ConfigException;

/**
* Implements the SchemaRegistryClient interface. Used as default when the Schema Registry URL isn't
* specified in {@link io.confluent.ksql.util.KsqlConfig}
*/
public class DefaultSchemaRegistryClient implements SchemaRegistryClient {

private final ConfigException configException;

public DefaultSchemaRegistryClient(final ErrorMessages errorMessages) {
configException = new ConfigException(errorMessages.schemaRegistryUnconfiguredErrorMessage());
}

@Override
public Optional<ParsedSchema> parseSchema(
final String var1, final String var2,
final List<SchemaReference> var3
) {
throw configException;
}

@Override
public int register(final String s, final ParsedSchema parsedSchema) {
throw configException;
}

@Override
public int register(final String s, final ParsedSchema parsedSchema, final int i, final int i1) {
throw configException;
}

@Override
public ParsedSchema getSchemaById(final int i) {
throw configException;
}

@Override
public ParsedSchema getSchemaBySubjectAndId(final String s, final int i) {
throw configException;
}

@Override
public Collection<String> getAllSubjectsById(final int i) {
throw configException;
}

@Override
public SchemaMetadata getLatestSchemaMetadata(final String s) {
throw configException;
}

@Override
public SchemaMetadata getSchemaMetadata(final String s, final int i) {
throw configException;
}

@Override
public int getVersion(final String s, final ParsedSchema parsedSchema) {
throw configException;
}

@Override
public List<Integer> getAllVersions(final String s) {
throw configException;
}

@Override
public boolean testCompatibility(final String s, final ParsedSchema parsedSchema) {
throw configException;
}

@Override
public String updateCompatibility(final String s, final String s1) {
throw configException;
}

@Override
public String getCompatibility(final String s) {
throw configException;
}

@Override
public String setMode(final String s) {
throw configException;
}

@Override
public String setMode(final String s, final String s1) {
throw configException;
}

@Override
public String getMode() {
throw configException;
}

@Override
public String getMode(final String s) {
throw configException;
}

@Override
public Collection<String> getAllSubjects() {
return Collections.emptyList();
}

@Override
public int getId(final String s, final ParsedSchema parsedSchema) {
throw configException;
}

@Override
public List<Integer> deleteSubject(final String s) {
throw configException;
}

@Override
public List<Integer> deleteSubject(final Map<String, String> map, final String s) {
throw configException;
}

@Override
public Integer deleteSchemaVersion(final String s, final String s1) {
throw configException;
}

@Override
public Integer deleteSchemaVersion(
final Map<String, String> map,
final String s,
final String s1
) {
throw configException;
}

@Override
public void reset() {
throw configException;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.RestService;
import io.confluent.ksql.rest.ErrorMessages;
import io.confluent.ksql.util.KsqlConfig;
import java.util.Map;
import java.util.Objects;
import java.util.function.Supplier;
import javax.net.ssl.SSLContext;
import org.apache.kafka.common.network.Mode;
Expand All @@ -36,27 +38,32 @@ public class KsqlSchemaRegistryClientFactory {
private final Map<String, Object> schemaRegistryClientConfigs;
private final SchemaRegistryClientFactory schemaRegistryClientFactory;
private final Map<String, String> httpHeaders;
private final String schemaRegistryUrl;
private final ErrorMessages errorMessages;

interface SchemaRegistryClientFactory {
CachedSchemaRegistryClient create(RestService service,
int identityMapCapacity,
Map<String, Object> clientConfigs,
Map<String, String> httpHeaders);
}

public KsqlSchemaRegistryClientFactory(
final KsqlConfig config,
final ErrorMessages errorMessages,
final Map<String, String> schemaRegistryHttpHeaders
) {
this(config, newSchemaRegistrySslFactory(config), schemaRegistryHttpHeaders);
this(config, errorMessages, newSchemaRegistrySslFactory(config), schemaRegistryHttpHeaders);
}

public KsqlSchemaRegistryClientFactory(
final KsqlConfig config,
final ErrorMessages errorMessages,
final SslFactory sslFactory,
final Map<String, String> schemaRegistryHttpHeaders
) {
this(config,
errorMessages,
() -> new RestService(config.getString(KsqlConfig.SCHEMA_REGISTRY_URL_PROPERTY)),
sslFactory,
CachedSchemaRegistryClient::new,
Expand All @@ -69,6 +76,7 @@ public KsqlSchemaRegistryClientFactory(

@VisibleForTesting
KsqlSchemaRegistryClientFactory(final KsqlConfig config,
final ErrorMessages errorMessages,
final Supplier<RestService> serviceSupplier,
final SslFactory sslFactory,
final SchemaRegistryClientFactory schemaRegistryClientFactory,
Expand All @@ -80,6 +88,8 @@ public KsqlSchemaRegistryClientFactory(

this.schemaRegistryClientFactory = schemaRegistryClientFactory;
this.httpHeaders = httpHeaders;
this.schemaRegistryUrl = config.getString(KsqlConfig.SCHEMA_REGISTRY_URL_PROPERTY).trim();
this.errorMessages = Objects.requireNonNull(errorMessages, "errorMessages");
}

/**
Expand All @@ -98,6 +108,10 @@ static void configureSslFactory(final KsqlConfig config, final SslFactory sslFac
}

public SchemaRegistryClient get() {
if (schemaRegistryUrl.equals("")) {
return new DefaultSchemaRegistryClient(errorMessages);
}

final RestService restService = serviceSupplier.get();
final SSLContext sslContext = sslFactory.sslEngineBuilder().sslContext();
if (sslContext != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.confluent.ksql.services;

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.rest.DefaultErrorMessages;
import io.confluent.ksql.schema.registry.KsqlSchemaRegistryClientFactory;
import io.confluent.ksql.util.KsqlConfig;
import java.util.Collections;
Expand All @@ -35,7 +36,10 @@ public static ServiceContext create(
return create(
ksqlConfig,
new DefaultKafkaClientSupplier(),
new KsqlSchemaRegistryClientFactory(ksqlConfig, Collections.emptyMap())::get,
new KsqlSchemaRegistryClientFactory(
ksqlConfig,
new DefaultErrorMessages(),
Collections.emptyMap())::get,
() -> new DefaultConnectClient(ksqlConfig.getString(KsqlConfig.CONNECT_URL_PROPERTY),
Optional.empty()),
ksqlClientSupplier
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@

package io.confluent.ksql.schema.registry;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.Assert.assertThat;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
Expand All @@ -31,6 +30,7 @@
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.RestService;
import io.confluent.ksql.rest.ErrorMessages;
import io.confluent.ksql.util.KsqlConfig;
import java.util.Collections;
import java.util.Map;
Expand Down Expand Up @@ -61,6 +61,8 @@ public class KsqlSchemaRegistryClientFactoryTest {
private SslFactory sslFactory;
@Mock
private SslEngineBuilder sslEngineBuilder;
@Mock
private ErrorMessages errorMessages;

@Mock
private KsqlSchemaRegistryClientFactory.SchemaRegistryClientFactory srClientFactory;
Expand All @@ -76,10 +78,6 @@ public void setUp() {
when(sslEngineBuilder.sslContext()).thenReturn(SSL_CONTEXT);
}

@Test
public void should() {
}

@Test
public void shouldSetSocketFactoryWhenNoSpecificSslConfig() {
// Given:
Expand Down Expand Up @@ -129,12 +127,34 @@ public void shouldPickUpPrefixedSslConfig() {
verify(sslFactory).configure(expectedConfigs);
}

@Test
public void shouldUseDefaultSchemaRegistryClientWhenUrlNotSpecified() {
// Given
final KsqlConfig config1 = config();

final Map<String, Object> schemaRegistryClientConfigs = ImmutableMap.of(
"ksql.schema.registry.url", " "
);
final KsqlConfig config2 = new KsqlConfig(schemaRegistryClientConfigs);

// When:
SchemaRegistryClient client1 = new KsqlSchemaRegistryClientFactory(
config1, errorMessages, restServiceSupplier, sslFactory, srClientFactory, Collections.emptyMap()).get();
SchemaRegistryClient client2 = new KsqlSchemaRegistryClientFactory(
config2, errorMessages, restServiceSupplier, sslFactory, srClientFactory, Collections.emptyMap()).get();

// Then:
assertThat(client1, instanceOf(DefaultSchemaRegistryClient.class));
assertThat(client2, instanceOf(DefaultSchemaRegistryClient.class));
}

@Test
public void shouldPassBasicAuthCredentialsToSchemaRegistryClient() {
// Given
final Map<String, Object> schemaRegistryClientConfigs = ImmutableMap.of(
"ksql.schema.registry.basic.auth.credentials.source", "USER_INFO",
"ksql.schema.registry.basic.auth.user.info", "username:password"
"ksql.schema.registry.basic.auth.user.info", "username:password",
"ksql.schema.registry.url", "some url"
);

final KsqlConfig config = new KsqlConfig(schemaRegistryClientConfigs);
Expand All @@ -145,7 +165,7 @@ public void shouldPassBasicAuthCredentialsToSchemaRegistryClient() {

// When:
new KsqlSchemaRegistryClientFactory(
config, restServiceSupplier, sslFactory, srClientFactory, Collections.emptyMap()).get();
config, errorMessages, restServiceSupplier, sslFactory, srClientFactory, Collections.emptyMap()).get();

// Then:
verify(restService).setSslSocketFactory(isA(SSL_CONTEXT.getSocketFactory().getClass()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,10 @@ static KsqlRestApplication buildApplication(
) {
final KsqlConfig ksqlConfig = new KsqlConfig(restConfig.getKsqlConfigProperties());
final Supplier<SchemaRegistryClient> schemaRegistryClientFactory =
new KsqlSchemaRegistryClientFactory(ksqlConfig, Collections.emptyMap())::get;
new KsqlSchemaRegistryClientFactory(ksqlConfig, restConfig.getConfiguredInstance(
KsqlRestConfig.KSQL_SERVER_ERROR_MESSAGES,
ErrorMessages.class
),Collections.emptyMap())::get;
final ServiceContext serviceContext = new LazyServiceContext(() ->
RestServiceContextFactory.create(ksqlConfig, Optional.empty(),
schemaRegistryClientFactory));
Expand Down
Loading

0 comments on commit e045f7c

Please sign in to comment.