Skip to content

Commit

Permalink
DataSource with optional version support, latest by default
Browse files Browse the repository at this point in the history
  • Loading branch information
noureddineseddik authored and arkiaconsulting committed Aug 28, 2021
1 parent dff69f0 commit 5fa552a
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 4 deletions.
30 changes: 29 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ resource "schemaregistry_schema" "main" {
Schema registry references can be used to allow [putting Several Event Types in the Same Topic](https://www.confluent.io/blog/multiple-event-types-in-the-same-kafka-topic/).
Please refer to Confluent [Schema Registry API Reference](https://docs.confluent.io/platform/current/schema-registry/develop/api.html) for details.

Referenced versions will always be upgraded with the referenced event schema version.
### Upgrade reference version to the latest event schema version

Reference the event schema `resource` from a schema with reference wil upgrade a reference alongside with its referenced schema.

```
resource "schemaregistry_schema" "referenced_event" {
Expand Down Expand Up @@ -63,7 +65,33 @@ resource "schemaregistry_schema" "with_reference" {
version = schemaregistry_schema.referenced_event.version
}
}
```

### Stick reference version to a given version

Use a `dataSource` to stick a reference to a **given version**, while upgrading the referenced event schema.

```
resource "schemaregistry_schema" "referenced_event_latest" {
subject = "referenced_event_subject"
schema = file("<avro_schema_file_updated>")
}
data "schemaregistry_schema" "referenced_event_v1" {
subject = "other_referenced_event_subject"
schema = "{\"type\":\"record\",\"name\":\"other_event\",\"namespace\":\"akc.test\",\"fields\":[{\"name\":\"bar\",\"type\":\"string\"}]}"
}
resource "schemaregistry_schema" "with_reference_to_v1" {
subject = "with_reference_subject"
schema = "[\"akc.test.event\", \"akc.test.other_event\"]"
references {
name = "akc.test.event"
subject = data.schemaregistry_schema.referenced_event_v1.subject
version = data.schemaregistry_schema.referenced_event_v1.version
}
}
```

## The schema data source
Expand Down
6 changes: 6 additions & 0 deletions examples/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ data "schemaregistry_schema" "main" {
subject = schemaregistry_schema.with_reference.subject
}

data "schemaregistry_schema" "user_added_v1" {
subject = "MyTopic-akc.test.userAdded-value"
schema = 1
}


output "schema_id" {
value = data.schemaregistry_schema.main.id
}
Expand Down
14 changes: 11 additions & 3 deletions schemaregistry/data_source_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func dataSourceSchema() *schema.Resource {
},
"version": {
Type: schema.TypeInt,
Computed: true,
Optional: true,
Description: "The version of the schema",
},
"schema_id": {
Expand Down Expand Up @@ -64,13 +64,21 @@ func dataSourceSubjectRead(ctx context.Context, d *schema.ResourceData, m interf
var diags diag.Diagnostics

subject := d.Get("subject").(string)
version := d.Get("version").(int)

client := m.(*srclient.SchemaRegistryClient)
var schema *srclient.Schema
var err error

if version > 0 {
schema, err = client.GetSchemaByVersionWithArbitrarySubject(subject, version)

} else {
schema, err = client.GetLatestSchemaWithArbitrarySubject(subject)
}

schema, err := client.GetLatestSchemaWithArbitrarySubject(subject)
if err != nil {
return diag.FromErr(err)
// return diag.FromErr(fmt.Errorf("unknown schema for subject '%s'", subject))
}

d.Set("schema", schema.Schema())
Expand Down
57 changes: 57 additions & 0 deletions schemaregistry/data_source_schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,4 +106,61 @@ func TestAccDataSourceSchemaReferences_basic(t *testing.T) {
},
},
})
}

func TestAccDataSourceSchema_atVersion(t *testing.T) {
// GIVEN
url, found := os.LookupEnv("SCHEMA_REGISTRY_URL")
if !found {
t.Fatalf("SCHEMA_REGISTRY_URL must be set for acceptance tests")
}
username := os.Getenv("SCHEMA_REGISTRY_USERNAME")
password := os.Getenv("SCHEMA_REGISTRY_PASSWORD")

client := srclient.CreateSchemaRegistryClient(url)
if (username != "") && (password != "") {
client.SetCredentials(username, password)
}

// AND
u, err := uuid.GenerateUUID()
if err != nil {
t.Fatal(err)
}

referencedSchemaSubject := fmt.Sprintf("referencedSub-%s", u)
referencedSchema := strings.Replace(fixtureAvro1, "\\", "", -1)
referencedSchemaLatest := strings.Replace(fixtureAvro2, "\\", "", -1)

// AND
if _, err = client.CreateSchemaWithArbitrarySubject(referencedSchemaSubject, referencedSchema, srclient.Avro); err != nil {
t.Fatalf("could not create schema for subject: %s, err: %s", referencedSchema, err)
}

if _, err = client.CreateSchemaWithArbitrarySubject(referencedSchemaSubject, referencedSchemaLatest, srclient.Avro); err != nil {
t.Fatalf("could not create schema for subject: %s, err: %s", referencedSchema, err)
}

// WHEN / THEN
resource.Test(t, resource.TestCase{
ProviderFactories: testAccProviders,
PreCheck: func() { testAccPreCheck(t) },
Steps: []resource.TestStep{
{
Config: fmt.Sprintf(`
data "schemaregistry_schema" "schemaAtVersion" {
subject = "%s"
version = 1
}
`, referencedSchemaSubject),
Check: resource.ComposeTestCheckFunc(
resource.TestCheckResourceAttr("data.schemaregistry_schema.schemaAtVersion", "id", referencedSchemaSubject),
resource.TestCheckResourceAttr("data.schemaregistry_schema.schemaAtVersion", "subject", referencedSchemaSubject),
resource.TestCheckResourceAttrSet("data.schemaregistry_schema.schemaAtVersion", "schema_id"),
resource.TestCheckResourceAttr("data.schemaregistry_schema.schemaAtVersion", "version", "1"),
resource.TestCheckResourceAttr("data.schemaregistry_schema.schemaAtVersion", "schema", referencedSchema),
),
},
},
})
}

0 comments on commit 5fa552a

Please sign in to comment.