Skip to content

Commit

Permalink
[databases]: add support for Kafka advanced configuration (#1587)
Browse files Browse the repository at this point in the history
* [databases]: add support for Kafka advanced configuration

* fix indention in test file

* Update commands/displayers/database.go

Co-authored-by: Andrew Starr-Bochicchio <[email protected]>

* Update commands/displayers/database.go

Co-authored-by: Andrew Starr-Bochicchio <[email protected]>

---------

Co-authored-by: Andrew Starr-Bochicchio <[email protected]>
  • Loading branch information
loosla and andrewsomething authored Sep 26, 2024
1 parent 84a6e48 commit de1fb73
Show file tree
Hide file tree
Showing 8 changed files with 312 additions and 6 deletions.
22 changes: 20 additions & 2 deletions commands/databases.go
Original file line number Diff line number Diff line change
Expand Up @@ -2443,6 +2443,7 @@ This command functions as a PATCH request, meaning that only the specified field
displayerType(&displayers.PostgreSQLConfiguration{}),
displayerType(&displayers.RedisConfiguration{}),
displayerType(&displayers.MongoDBConfiguration{}),
displayerType(&displayers.KafkaConfiguration{}),
)
AddStringFlag(
getDatabaseCfgCommand,
Expand Down Expand Up @@ -2501,9 +2502,10 @@ func RunDatabaseConfigurationGet(c *CmdConfig) error {
"pg": nil,
"redis": nil,
"mongodb": nil,
"kafka": nil,
}
if _, ok := allowedEngines[engine]; !ok {
return fmt.Errorf("(%s) command: engine must be one of: 'pg', 'mysql', 'redis', 'mongodb'", c.NS)
return fmt.Errorf("(%s) command: engine must be one of: 'pg', 'mysql', 'redis', 'mongodb', 'kafka'", c.NS)
}

dbId := args[0]
Expand Down Expand Up @@ -2547,6 +2549,16 @@ func RunDatabaseConfigurationGet(c *CmdConfig) error {
MongoDBConfig: *config,
}
return c.Display(&displayer)
} else if engine == "kafka" {
config, err := c.Databases().GetKafkaConfiguration(dbId)
if err != nil {
return err
}

displayer := displayers.KafkaConfiguration{
KafkaConfig: *config,
}
return c.Display(&displayer)
}

return nil
Expand All @@ -2571,9 +2583,10 @@ func RunDatabaseConfigurationUpdate(c *CmdConfig) error {
"pg": nil,
"redis": nil,
"mongodb": nil,
"kafka": nil,
}
if _, ok := allowedEngines[engine]; !ok {
return fmt.Errorf("(%s) command: engine must be one of: 'pg', 'mysql', 'redis', 'mongodb'", c.NS)
return fmt.Errorf("(%s) command: engine must be one of: 'pg', 'mysql', 'redis', 'mongodb', 'kafka'", c.NS)
}

configJson, err := c.Doit.GetString(c.NS, doctl.ArgDatabaseConfigJson)
Expand Down Expand Up @@ -2602,6 +2615,11 @@ func RunDatabaseConfigurationUpdate(c *CmdConfig) error {
if err != nil {
return err
}
} else if engine == "kafka" {
err := c.Databases().UpdateKafkaConfiguration(dbId, configJson)
if err != nil {
return err
}
}

return nil
Expand Down
24 changes: 24 additions & 0 deletions commands/databases_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,10 @@ var (
MongoDBConfig: &godo.MongoDBConfig{},
}

testKafkaConfiguration = do.KafkaConfig{
KafkaConfig: &godo.KafkaConfig{},
}

topicReplicationFactor = uint32(3)
testKafkaTopic = do.DatabaseTopic{
DatabaseTopic: &godo.DatabaseTopic{
Expand Down Expand Up @@ -1666,6 +1670,16 @@ func TestDatabaseConfigurationGet(t *testing.T) {
assert.NoError(t, err)
})

withTestClient(t, func(config *CmdConfig, tm *tcMocks) {
tm.databases.EXPECT().GetKafkaConfiguration(testDBCluster.ID).Return(&testKafkaConfiguration, nil)
config.Args = append(config.Args, testDBCluster.ID)
config.Doit.Set(config.NS, doctl.ArgDatabaseEngine, "kafka")

err := RunDatabaseConfigurationGet(config)

assert.NoError(t, err)
})

withTestClient(t, func(config *CmdConfig, tm *tcMocks) {
err := RunDatabaseConfigurationGet(config)

Expand Down Expand Up @@ -1730,6 +1744,16 @@ func TestDatabaseConfigurationUpdate(t *testing.T) {
assert.NoError(t, err)
})

withTestClient(t, func(config *CmdConfig, tm *tcMocks) {
tm.databases.EXPECT().UpdateKafkaConfiguration(testDBCluster.ID, "").Return(nil)
config.Args = append(config.Args, testDBCluster.ID)
config.Doit.Set(config.NS, doctl.ArgDatabaseEngine, "kafka")

err := RunDatabaseConfigurationUpdate(config)

assert.NoError(t, err)
})

withTestClient(t, func(config *CmdConfig, tm *tcMocks) {
err := RunDatabaseConfigurationUpdate(config)

Expand Down
133 changes: 133 additions & 0 deletions commands/displayers/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -1739,6 +1739,139 @@ func (dc *MongoDBConfiguration) KV() []map[string]any {
return o
}

type KafkaConfiguration struct {
KafkaConfig do.KafkaConfig
}

var _ Displayable = &KafkaConfiguration{}

func (dc *KafkaConfiguration) JSON(out io.Writer) error {
return writeJSON(dc.KafkaConfig, out)
}

func (dc *KafkaConfiguration) Cols() []string {
return []string{
"key",
"value",
}
}

func (dc *KafkaConfiguration) ColMap() map[string]string {
return map[string]string{
"key": "key",
"value": "value",
}
}

func (dc *KafkaConfiguration) KV() []map[string]any {
c := dc.KafkaConfig
o := []map[string]any{}
if c.GroupInitialRebalanceDelayMs != nil {
o = append(o, map[string]any{
"key": "GroupInitialRebalanceDelayMs",
"value": *c.GroupInitialRebalanceDelayMs,
})
}
if c.GroupMinSessionTimeoutMs != nil {
o = append(o, map[string]any{
"key": "GroupMinSessionTimeoutMs",
"value": *c.GroupMinSessionTimeoutMs,
})
}
if c.GroupMaxSessionTimeoutMs != nil {
o = append(o, map[string]any{
"key": "GroupMaxSessionTimeoutMs",
"value": *c.GroupMaxSessionTimeoutMs,
})
}
if c.MessageMaxBytes != nil {
o = append(o, map[string]any{
"key": "MessageMaxBytes",
"value": *c.MessageMaxBytes,
})
}
if c.LogCleanerDeleteRetentionMs != nil {
o = append(o, map[string]any{
"key": "LogCleanerDeleteRetentionMs",
"value": *c.LogCleanerDeleteRetentionMs,
})
}
if c.LogCleanerMinCompactionLagMs != nil {
o = append(o, map[string]any{
"key": "LogCleanerMinCompactionLagMs",
"value": *c.LogCleanerMinCompactionLagMs,
})
}
if c.LogFlushIntervalMs != nil {
o = append(o, map[string]any{
"key": "LogFlushIntervalMs",
"value": *c.LogFlushIntervalMs,
})
}
if c.LogIndexIntervalBytes != nil {
o = append(o, map[string]any{
"key": "LogIndexIntervalBytes",
"value": *c.LogIndexIntervalBytes,
})
}
if c.LogMessageDownconversionEnable != nil {
o = append(o, map[string]any{
"key": "LogMessageDownconversionEnable",
"value": *c.LogMessageDownconversionEnable,
})
}
if c.LogMessageTimestampDifferenceMaxMs != nil {
o = append(o, map[string]any{
"key": "LogMessageTimestampDifferenceMaxMs",
"value": *c.LogMessageTimestampDifferenceMaxMs,
})
}
if c.LogPreallocate != nil {
o = append(o, map[string]any{
"key": "LogPreallocate",
"value": *c.LogPreallocate,
})
}
if c.LogRetentionBytes != nil {
o = append(o, map[string]any{
"key": "LogRetentionBytes",
"value": c.LogRetentionBytes.String(),
})
}
if c.LogRetentionHours != nil {
o = append(o, map[string]any{
"key": "LogRetentionHours",
"value": *c.LogRetentionHours,
})
}
if c.LogRetentionMs != nil {
o = append(o, map[string]any{
"key": "LogRetentionMs",
"value": c.LogRetentionMs.String(),
})
}
if c.LogRollJitterMs != nil {
o = append(o, map[string]any{
"key": "LogRollJitterMs",
"value": *c.LogRollJitterMs,
})
}
if c.LogSegmentDeleteDelayMs != nil {
o = append(o, map[string]any{
"key": "LogSegmentDeleteDelayMs",
"value": *c.LogSegmentDeleteDelayMs,
})
}
if c.AutoCreateTopicsEnable != nil {
o = append(o, map[string]any{
"key": "AutoCreateTopicsEnable",
"value": *c.AutoCreateTopicsEnable,
})
}

return o
}

type DatabaseEvents struct {
DatabaseEvents do.DatabaseEvents
}
Expand Down
33 changes: 33 additions & 0 deletions do/databases.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,11 @@ type MongoDBConfig struct {
*godo.MongoDBConfig
}

// KafkaConfig is a wrapper for godo.KafkaConfig
type KafkaConfig struct {
*godo.KafkaConfig
}

// DatabaseTopics is a slice of DatabaseTopic
type DatabaseTopics []DatabaseTopic

Expand Down Expand Up @@ -206,11 +211,13 @@ type DatabasesService interface {
GetPostgreSQLConfiguration(databaseID string) (*PostgreSQLConfig, error)
GetRedisConfiguration(databaseID string) (*RedisConfig, error)
GetMongoDBConfiguration(databaseID string) (*MongoDBConfig, error)
GetKafkaConfiguration(databaseID string) (*KafkaConfig, error)

UpdateMySQLConfiguration(databaseID string, confString string) error
UpdatePostgreSQLConfiguration(databaseID string, confString string) error
UpdateRedisConfiguration(databaseID string, confString string) error
UpdateMongoDBConfiguration(databaseID string, confString string) error
UpdateKafkaConfiguration(databaseID string, confString string) error

ListTopics(string) (DatabaseTopics, error)
GetTopic(string, string) (*DatabaseTopic, error)
Expand Down Expand Up @@ -713,6 +720,17 @@ func (ds *databasesService) GetMongoDBConfiguration(databaseID string) (*MongoDB
}, nil
}

func (ds *databasesService) GetKafkaConfiguration(databaseID string) (*KafkaConfig, error) {
cfg, _, err := ds.client.Databases.GetKafkaConfig(context.TODO(), databaseID)
if err != nil {
return nil, err
}

return &KafkaConfig{
KafkaConfig: cfg,
}, nil
}

func (ds *databasesService) UpdateMySQLConfiguration(databaseID string, confString string) error {
var conf godo.MySQLConfig
err := json.Unmarshal([]byte(confString), &conf)
Expand Down Expand Up @@ -773,6 +791,21 @@ func (ds *databasesService) UpdateMongoDBConfiguration(databaseID string, confSt
return nil
}

func (ds *databasesService) UpdateKafkaConfiguration(databaseID string, confString string) error {
var conf godo.KafkaConfig
err := json.Unmarshal([]byte(confString), &conf)
if err != nil {
return err
}

_, err = ds.client.Databases.UpdateKafkaConfig(context.TODO(), databaseID, &conf)
if err != nil {
return err
}

return nil
}

func (ds *databasesService) ListTopics(databaseID string) (DatabaseTopics, error) {
f := func(opt *godo.ListOptions) ([]any, *godo.Response, error) {
list, resp, err := ds.client.Databases.ListTopics(context.TODO(), databaseID, opt)
Expand Down
29 changes: 29 additions & 0 deletions do/mocks/DatabasesService.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 0 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,6 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/digitalocean/godo v1.125.0 h1:wGPBQRX9Wjo0qCF0o8d25mT3A84Iw8rfHnZOPyvHcMQ=
github.com/digitalocean/godo v1.125.0/go.mod h1:PU8JB6I1XYkQIdHFop8lLAY9ojp6M0XcU0TWaQSxbrc=
github.com/digitalocean/godo v1.125.1-0.20240925184037-40ea734536f0 h1:hEi5W+TPrYUjq1PLt1lJmhrt+ezpzUrAvwYr9f1Xo4U=
github.com/digitalocean/godo v1.125.1-0.20240925184037-40ea734536f0/go.mod h1:PU8JB6I1XYkQIdHFop8lLAY9ojp6M0XcU0TWaQSxbrc=
github.com/digitalocean/godo v1.126.0 h1:+Znh7VMQj/E8ArbjWnc7OKGjWfzC+I8OCSRp7r1MdD8=
github.com/digitalocean/godo v1.126.0/go.mod h1:PU8JB6I1XYkQIdHFop8lLAY9ojp6M0XcU0TWaQSxbrc=
github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5QvfrDyIgxBk=
Expand Down
Loading

0 comments on commit de1fb73

Please sign in to comment.