Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow skipping topic creation #4616

Merged
merged 4 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions config/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,10 @@
| `wal.prefill_log_files` | Bool | `false` | Whether to pre-create log files on start up.<br/>**It's only used when the provider is `raft_engine`**. |
| `wal.sync_period` | String | `10s` | Duration for fsyncing log files.<br/>**It's only used when the provider is `raft_engine`**. |
| `wal.broker_endpoints` | Array | -- | The Kafka broker endpoints.<br/>**It's only used when the provider is `kafka`**. |
| `wal.num_topics` | Integer | `64` | Number of topics to be created upon start.<br/>**It's only used when the provider is `kafka`**. |
| `wal.auto_create_topics` | Bool | `true` | Automatically create topics for WAL.<br/>Set to `true` to automatically create topics for WAL.<br/>Otherwise, use topics named `topic_name_prefix_[0..num_topics)` |
| `wal.num_topics` | Integer | `64` | Number of topics.<br/>**It's only used when the provider is `kafka`**. |
| `wal.selector_type` | String | `round_robin` | Topic selector type.<br/>Available selector types:<br/>- `round_robin` (default)<br/>**It's only used when the provider is `kafka`**. |
| `wal.topic_name_prefix` | String | `greptimedb_wal_topic` | A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.<br/>**It's only used when the provider is `kafka`**. |
| `wal.topic_name_prefix` | String | `greptimedb_wal_topic` | A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.<br/>i.g., greptimedb_wal_topic_0, greptimedb_wal_topic_1.<br/>**It's only used when the provider is `kafka`**. |
| `wal.replication_factor` | Integer | `1` | Expected number of replicas of each partition.<br/>**It's only used when the provider is `kafka`**. |
| `wal.create_topic_timeout` | String | `30s` | Above which a topic creation operation will be cancelled.<br/>**It's only used when the provider is `kafka`**. |
| `wal.max_batch_bytes` | String | `1MB` | The max size of a single producer batch.<br/>Warning: Kafka has a default limit of 1MB per message in a topic.<br/>**It's only used when the provider is `kafka`**. |
Expand Down Expand Up @@ -287,9 +288,10 @@
| `wal` | -- | -- | -- |
| `wal.provider` | String | `raft_engine` | -- |
| `wal.broker_endpoints` | Array | -- | The broker endpoints of the Kafka cluster. |
| `wal.num_topics` | Integer | `64` | Number of topics to be created upon start. |
| `wal.auto_create_topics` | Bool | `true` | Automatically create topics for WAL.<br/>Set to `true` to automatically create topics for WAL.<br/>Otherwise, use topics named `topic_name_prefix_[0..num_topics)` |
| `wal.num_topics` | Integer | `64` | Number of topics. |
| `wal.selector_type` | String | `round_robin` | Topic selector type.<br/>Available selector types:<br/>- `round_robin` (default) |
| `wal.topic_name_prefix` | String | `greptimedb_wal_topic` | A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`. |
| `wal.topic_name_prefix` | String | `greptimedb_wal_topic` | A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.<br/>i.g., greptimedb_wal_topic_0, greptimedb_wal_topic_1. |
| `wal.replication_factor` | Integer | `1` | Expected number of replicas of each partition. |
| `wal.create_topic_timeout` | String | `30s` | Above which a topic creation operation will be cancelled. |
| `wal.backoff_init` | String | `500ms` | The initial backoff for kafka clients. |
Expand Down
8 changes: 7 additions & 1 deletion config/metasrv.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,12 @@ provider = "raft_engine"
## The broker endpoints of the Kafka cluster.
broker_endpoints = ["127.0.0.1:9092"]

## Number of topics to be created upon start.
## Automatically create topics for WAL.
## Set to `true` to automatically create topics for WAL.
## Otherwise, use topics named `topic_name_prefix_[0..num_topics)`
auto_create_topics = true

## Number of topics.
num_topics = 64

## Topic selector type.
Expand All @@ -108,6 +113,7 @@ num_topics = 64
selector_type = "round_robin"

## A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.
## i.g., greptimedb_wal_topic_0, greptimedb_wal_topic_1.
topic_name_prefix = "greptimedb_wal_topic"

## Expected number of replicas of each partition.
Expand Down
8 changes: 7 additions & 1 deletion config/standalone.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,12 @@ sync_period = "10s"
## **It's only used when the provider is `kafka`**.
broker_endpoints = ["127.0.0.1:9092"]

## Number of topics to be created upon start.
## Automatically create topics for WAL.
## Set to `true` to automatically create topics for WAL.
## Otherwise, use topics named `topic_name_prefix_[0..num_topics)`
auto_create_topics = true

## Number of topics.
## **It's only used when the provider is `kafka`**.
num_topics = 64

Expand All @@ -182,6 +187,7 @@ num_topics = 64
selector_type = "round_robin"

## A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.
## i.g., greptimedb_wal_topic_0, greptimedb_wal_topic_1.
## **It's only used when the provider is `kafka`**.
topic_name_prefix = "greptimedb_wal_topic"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ impl TopicManager {
/// The initializer first tries to restore persisted topics from the kv backend.
/// If not enough topics retrieved, the initializer will try to contact the Kafka cluster and request creating more topics.
pub async fn start(&self) -> Result<()> {
// Skip creating topics.
if !self.config.auto_create_topics {
return Ok(());
}
let num_topics = self.config.kafka_topic.num_topics;
ensure!(num_topics > 0, InvalidNumTopicsSnafu { num_topics });

Expand Down
2 changes: 2 additions & 0 deletions src/common/wal/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ impl From<DatanodeWalConfig> for MetasrvWalConfig {
connection: config.connection,
backoff: config.backoff,
kafka_topic: config.kafka_topic,
auto_create_topics: config.auto_create_topics,
}),
}
}
Expand Down Expand Up @@ -188,6 +189,7 @@ mod tests {
replication_factor: 1,
create_topic_timeout: Duration::from_secs(30),
},
auto_create_topics: true,
};
assert_eq!(metasrv_wal_config, MetasrvWalConfig::Kafka(expected));

Expand Down
2 changes: 1 addition & 1 deletion src/common/wal/src/config/kafka/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ impl Default for KafkaConnectionConfig {
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct KafkaTopicConfig {
/// Number of topics to be created upon start.
/// Number of topics.
pub num_topics: usize,
/// Number of partitions per topic.
pub num_partitions: i32,
Expand Down
4 changes: 4 additions & 0 deletions src/common/wal/src/config/kafka/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ pub struct DatanodeKafkaConfig {
/// The kafka topic config.
#[serde(flatten)]
pub kafka_topic: KafkaTopicConfig,
// Automatically create topics for WAL.
pub auto_create_topics: bool,
// Create index for WAL.
pub create_index: bool,
#[serde(with = "humantime_serde")]
pub dump_index_interval: Duration,
Expand All @@ -54,6 +57,7 @@ impl Default for DatanodeKafkaConfig {
consumer_wait_timeout: Duration::from_millis(100),
backoff: BackoffConfig::default(),
kafka_topic: KafkaTopicConfig::default(),
auto_create_topics: true,
create_index: true,
dump_index_interval: Duration::from_secs(60),
}
Expand Down
15 changes: 14 additions & 1 deletion src/common/wal/src/config/kafka/metasrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use super::common::KafkaConnectionConfig;
use crate::config::kafka::common::{backoff_prefix, BackoffConfig, KafkaTopicConfig};

/// Kafka wal configurations for metasrv.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)]
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct MetasrvKafkaConfig {
/// The kafka connection config.
Expand All @@ -30,4 +30,17 @@ pub struct MetasrvKafkaConfig {
/// The kafka config.
#[serde(flatten)]
pub kafka_topic: KafkaTopicConfig,
// Automatically create topics for WAL.
pub auto_create_topics: bool,
}

impl Default for MetasrvKafkaConfig {
fn default() -> Self {
Self {
connection: Default::default(),
backoff: Default::default(),
kafka_topic: Default::default(),
auto_create_topics: true,
}
}
}