Skip to content

Commit

Permalink
fix kafka catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
mchades committed Jul 9, 2024
1 parent 909448b commit 3484db8
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package com.datastrato.gravitino.catalog.kafka;

import static com.datastrato.gravitino.StringIdentifier.DUMMY_ID;
import static com.datastrato.gravitino.StringIdentifier.ID_KEY;
import static com.datastrato.gravitino.StringIdentifier.newPropertiesWithId;
import static com.datastrato.gravitino.catalog.kafka.KafkaCatalogPropertiesMetadata.BOOTSTRAP_SERVERS;
Expand Down Expand Up @@ -568,9 +569,10 @@ private Map<String, String> buildNewTopicConfigs(Map<String, String> properties)
}

private void createDefaultSchemaIfNecessary() {
// If the default schema already exists, do nothing
// If the default schema already exists or is testing operation, do nothing
try {
if (store.exists(defaultSchemaIdent, Entity.EntityType.SCHEMA)) {
if (DUMMY_ID.toString().equals(info.properties().get(ID_KEY))
|| store.exists(defaultSchemaIdent, Entity.EntityType.SCHEMA)) {
return;
}
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ public static void setUp() {
.withNamespace(Namespace.of(METALAKE_NAME))
.withType(MESSAGING)
.withProvider("kafka")
.withProperties(MOCK_CATALOG_PROPERTIES)
.withAuditInfo(
AuditInfo.builder()
.withCreator("testKafkaUser")
Expand Down Expand Up @@ -222,6 +223,7 @@ public void testKafkaCatalogConfiguration() {
.withCreator("testKafkaUser")
.withCreateTime(Instant.now())
.build())
.withProperties(MOCK_CATALOG_PROPERTIES)
.build();
KafkaCatalogOperations ops = new KafkaCatalogOperations(store, idGenerator);
Assertions.assertNull(ops.adminClientConfig);
Expand Down Expand Up @@ -257,6 +259,7 @@ public void testInitialization() {
.withCreator("testKafkaUser")
.withCreateTime(Instant.now())
.build())
.withProperties(MOCK_CATALOG_PROPERTIES)
.build();
KafkaCatalogOperations ops = new KafkaCatalogOperations(store, idGenerator);
ops.initialize(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.datastrato.gravitino.Namespace;
import com.datastrato.gravitino.Schema;
import com.datastrato.gravitino.SchemaChange;
import com.datastrato.gravitino.TestOperations;
import com.datastrato.gravitino.client.GravitinoMetalake;
import com.datastrato.gravitino.exceptions.NoSuchCatalogException;
import com.datastrato.gravitino.integration.test.container.ContainerSuite;
Expand Down Expand Up @@ -139,11 +140,19 @@ public static void shutdown() {

@Test
public void testCatalog() throws ExecutionException, InterruptedException {
// test create catalog
String catalogName = GravitinoITUtils.genRandomName("test_catalog");
String comment = "test catalog";
Map<String, String> properties =
ImmutableMap.of(BOOTSTRAP_SERVERS, kafkaBootstrapServers, "key1", "value1");

// test before creation
TestOperations.TestResult testResult =
metalake.testCatalogCreation(
catalogName, Catalog.Type.MESSAGING, PROVIDER, comment, properties);
Assertions.assertTrue(testResult.valid());
Assertions.assertNull(testResult.exception());

// test create catalog
Catalog createdCatalog = createCatalog(catalogName, comment, properties);
Assertions.assertEquals(catalogName, createdCatalog.name());
Assertions.assertEquals(comment, createdCatalog.comment());
Expand Down Expand Up @@ -190,6 +199,21 @@ public void testCatalogException() {
IllegalArgumentException.class, () -> catalog1.asSchemas().listSchemas());
Assertions.assertTrue(exception.getMessage().contains("Invalid url in bootstrap.servers: 2"));

// test before creation
ImmutableMap<String, String> illegalProps = ImmutableMap.of("abc", "2");
TestOperations.TestResult testResult =
metalake.testCatalogCreation(
GravitinoITUtils.genRandomName("test_catalog"),
Catalog.Type.MESSAGING,
PROVIDER,
"comment",
illegalProps);
Assertions.assertFalse(testResult.valid());
Assertions.assertInstanceOf(IllegalArgumentException.class, testResult.exception());
Assertions.assertEquals(
"Properties are required and must be set: [bootstrap.servers]",
testResult.exception().getMessage());

exception =
Assertions.assertThrows(
IllegalArgumentException.class,
Expand All @@ -199,7 +223,7 @@ public void testCatalogException() {
Catalog.Type.MESSAGING,
PROVIDER,
"comment",
ImmutableMap.of("abc", "2")));
illegalProps));
Assertions.assertTrue(
exception
.getMessage()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ public class StringIdentifier {

public static final String ID_KEY = "gravitino.identifier";

/** For testing operations only */
public static final StringIdentifier DUMMY_ID = fromId(-1L);

@VisibleForTesting static final int CURRENT_FORMAT_VERSION = 1;

@VisibleForTesting static final String CURRENT_FORMAT = "gravitino.v%d.uid%d";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package com.datastrato.gravitino.catalog;

import static com.datastrato.gravitino.StringIdentifier.DUMMY_ID;
import static com.datastrato.gravitino.StringIdentifier.ID_KEY;
import static com.datastrato.gravitino.catalog.PropertiesMetadataHelpers.validatePropertyForAlter;
import static com.datastrato.gravitino.catalog.PropertiesMetadataHelpers.validatePropertyForCreate;
Expand Down Expand Up @@ -443,13 +444,13 @@ public TestOperations.TestResult testCatalogCreation(
String creator = PrincipalUtils.getCurrentPrincipal().getName();
CatalogEntity dummyEntity =
CatalogEntity.builder()
.withId(-1L) // dummy id
.withId(DUMMY_ID.id())
.withName(ident.name())
.withNamespace(ident.namespace())
.withType(type)
.withProvider(provider)
.withComment(comment)
.withProperties(mergedConfig)
.withProperties(StringIdentifier.newPropertiesWithId(DUMMY_ID, mergedConfig))
.withAuditInfo(
AuditInfo.builder()
.withCreator(creator)
Expand Down

0 comments on commit 3484db8

Please sign in to comment.