Skip to content

Commit

Permalink
Add sharding and replication guide (#34)
Browse files Browse the repository at this point in the history
* Add sharding and replication guide

Signed-off-by: Pavol Loffay <[email protected]>

* Revert

Signed-off-by: Pavol Loffay <[email protected]>

* typos

Signed-off-by: Pavol Loffay <[email protected]>

* typo

Signed-off-by: Pavol Loffay <[email protected]>

* Rename to _local

Signed-off-by: Pavol Loffay <[email protected]>

* use if not exists

Signed-off-by: Pavol Loffay <[email protected]>

* make kubectl easier to use

Signed-off-by: Pavol Loffay <[email protected]>

* Some fixes

Signed-off-by: Pavol Loffay <[email protected]>

* some fixes

Signed-off-by: Pavol Loffay <[email protected]>
  • Loading branch information
pavolloffay authored Jul 27, 2021
1 parent eceb1d2 commit d67bbb6
Show file tree
Hide file tree
Showing 8 changed files with 203 additions and 7 deletions.
8 changes: 7 additions & 1 deletion config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,10 @@ password:
# Database name. Default is "default".
database:
# Endpoint for scraping prometheus metrics. Default localhost:9090
metrics_endpoint: localhost:9090
metrics_endpoint: localhost:9090
# Table with spans. Default "jaeger_spans_local".
spans_table:
# Span index table. Default "jaeger_index_local".
spans_index_table:
# Operations table. Default "jaeger_operations_local".
operations_table:
171 changes: 171 additions & 0 deletions sharding-and-replication.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
# Sharding and Replication

This is a guide how to setup sharding and replication for Jaeger data.
This guide uses [clickhouse-operator](https://github.com/Altinity/clickhouse-operator) to deploy
the storage.

## Sharding

Sharding is a feature that allows splitting the data into multiple Clickhouse nodes to
increase throughput and decrease latency.
The sharding feature uses `Distributed` engine that is backed by local tables.
The distributed engine is a "virtual" table that does not store any data. It is used as
an interface to insert and query data.

To setup sharding run the following statements on all nodes in the cluster.
The "local" tables have to be created on the nodes before the distributed table.

```sql
CREATE TABLE IF NOT EXISTS jaeger_spans AS jaeger_spans_local ENGINE = Distributed('{cluster}', default, jaeger_spans_local, cityHash64(traceID));
CREATE TABLE IF NOT EXISTS jaeger_index AS jaeger_index_local ENGINE = Distributed('{cluster}', default, jaeger_index_local, cityHash64(traceID));
CREATE TABLE IF NOT EXISTS jaeger_operations AS jaeger_operations_local ENGINE = Distributed('{cluster}', default, jaeger_operations_local, rand());
```

* The `AS <table-name>` statement creates table with the same schema as the specified one.
* The `Distributed` engine takes as parameters cluster , database, table name and sharding key.

If the distributed table is not created on all Clickhouse nodes the Jaeger query fails to get the data from the storage.

### Deploy Clickhouse

Deploy Clickhouse with 2 shards:

```yaml
cat <<EOF | kubectl apply -f -
apiVersion: "clickhouse.altinity.com/v1"
kind: "ClickHouseInstallation"
metadata:
name: jaeger
spec:
configuration:
clusters:
- name: cluster1
layout:
shardsCount: 2
EOF
```

Use the following command to run `clickhouse-client` on Clickhouse nodes and create the distributed tables:
```bash
kubectl exec -it statefulset.apps/chi-jaeger-cluster1-0-0 -- clickhouse-client
```

### Plugin configuration

The plugin has to be configured to write and read that from the global tables:

```yaml
address: tcp://clickhouse-jaeger:9000
spans_table: jaeger_spans
spans_index_table: jaeger_index
operations_table: jaeger_operations
```
## Replication
Replication as the name suggest automatically replicates the data across multiple Clickhouse nodes.
It is used to accomplish high availability, load scaling and migration/updates.
The replication uses Zookeeper. Refer to the Clickhouse operator how to deploy Zookeeper.
Zookeeper allows us to use `ON CLUSTER` to automatically replicate table creation on all nodes.
Therefore the following command can be run only on a single Clickhouse node:

```sql
CREATE TABLE IF NOT EXISTS jaeger_spans_local ON CLUSTER '{cluster}' (
timestamp DateTime CODEC(Delta, ZSTD(1)),
traceID String CODEC(ZSTD(1)),
model String CODEC(ZSTD(3))
) ENGINE ReplicatedMergeTree('/clickhouse/tables/{shard}/jaeger_spans', '{replica}')
PARTITION BY toDate(timestamp)
ORDER BY traceID
SETTINGS index_granularity=1024;
CREATE TABLE IF NOT EXISTS jaeger_index_local ON CLUSTER '{cluster}' (
timestamp DateTime CODEC(Delta, ZSTD(1)),
traceID String CODEC(ZSTD(1)),
service LowCardinality(String) CODEC(ZSTD(1)),
operation LowCardinality(String) CODEC(ZSTD(1)),
durationUs UInt64 CODEC(ZSTD(1)),
tags Array(String) CODEC(ZSTD(1)),
INDEX idx_tags tags TYPE bloom_filter(0.01) GRANULARITY 64,
INDEX idx_duration durationUs TYPE minmax GRANULARITY 1
) ENGINE ReplicatedMergeTree('/clickhouse/tables/{shard}/jaeger_index', '{replica}')
PARTITION BY toDate(timestamp)
ORDER BY (service, -toUnixTimestamp(timestamp))
SETTINGS index_granularity=1024;
CREATE MATERIALIZED VIEW IF NOT EXISTS jaeger_operations_local ON CLUSTER '{cluster}'
ENGINE ReplicatedMergeTree('/clickhouse/tables/{shard}/jaeger_operations', '{replica}')
PARTITION BY toYYYYMM(date) ORDER BY (date, service, operation)
SETTINGS index_granularity=32
POPULATE
AS SELECT
toDate(timestamp) AS date,
service,
operation,
count() as count
FROM jaeger_index_local
GROUP BY date, service, operation;
CREATE TABLE IF NOT EXISTS jaeger_spans ON CLUSTER '{cluster}' AS jaeger_spans_local ENGINE = Distributed('{cluster}', default, jaeger_spans_local, cityHash64(traceID));
CREATE TABLE IF NOT EXISTS jaeger_index ON CLUSTER '{cluster}' AS jaeger_index_local ENGINE = Distributed('{cluster}', default, jaeger_index_local, cityHash64(traceID));
CREATE TABLE IF NOT EXISTS jaeger_operations on CLUSTER '{cluster}' AS jaeger_operations_local ENGINE = Distributed('{cluster}', default, jaeger_operations_local, rand());
```

### Deploy Clickhouse

Before deploying Clickhouse make sure Zookeeper is running in `zoo1ns` namespace.

Deploy Clickhouse with 3 shards and 2 replicas. In total Clickhouse operator will deploy 6 pods:

```yaml
cat <<EOF | kubectl apply -f -
apiVersion: "clickhouse.altinity.com/v1"
kind: "ClickHouseInstallation"
metadata:
name: jaeger
spec:
configuration:
zookeeper:
nodes:
- host: zookeeper.zoo1ns
clusters:
- name: cluster1
layout:
shardsCount: 3
replicasCount: 2
templates:
podTemplates:
- name: clickhouse-with-empty-dir-volume-template
spec:
containers:
- name: clickhouse-pod
image: yandex/clickhouse-server:20.7
volumeMounts:
- name: clickhouse-storage
mountPath: /var/lib/clickhouse
volumes:
- name: clickhouse-storage
emptyDir:
medium: "" # accepted values: empty str (means node's default medium) or "Memory"
sizeLimit: 1Gi
EOF
```

## Useful Commands

### SQL

```sql
show tables
select count() from jaeger_spans
```

### Kubectl

```bash
kubectl port-forward service/clickhouse-jaeger 9000:9000
kubectl delete clickhouseinstallations.clickhouse.altinity.com jaeger
```
2 changes: 1 addition & 1 deletion sqlscripts/0001-jaeger-index.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CREATE TABLE IF NOT EXISTS jaeger_index (
CREATE TABLE IF NOT EXISTS jaeger_index_local (
timestamp DateTime CODEC(Delta, ZSTD(1)),
traceID String CODEC(ZSTD(1)),
service LowCardinality(String) CODEC(ZSTD(1)),
Expand Down
2 changes: 1 addition & 1 deletion sqlscripts/0002-jaeger-spans.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CREATE TABLE IF NOT EXISTS jaeger_spans (
CREATE TABLE IF NOT EXISTS jaeger_spans_local (
timestamp DateTime CODEC(Delta, ZSTD(1)),
traceID String CODEC(ZSTD(1)),
model String CODEC(ZSTD(3))
Expand Down
2 changes: 1 addition & 1 deletion sqlscripts/0003-jaeger-operations.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CREATE MATERIALIZED VIEW IF NOT EXISTS jaeger_operations
CREATE MATERIALIZED VIEW IF NOT EXISTS jaeger_operations_local
ENGINE SummingMergeTree
PARTITION BY toYYYYMM(date) ORDER BY (date, service, operation)
SETTINGS index_granularity=32
Expand Down
2 changes: 1 addition & 1 deletion sqlscripts/0004-jaeger-spans-archive.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CREATE TABLE IF NOT EXISTS jaeger_archive_spans (
CREATE TABLE IF NOT EXISTS jaeger_archive_spans_local (
timestamp DateTime CODEC(Delta, ZSTD(1)),
traceID String CODEC(ZSTD(1)),
model String CODEC(ZSTD(3))
Expand Down
19 changes: 19 additions & 0 deletions storage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ const (
defaultUsername = "default"
defaultDatabaseName = "default"
defaultMetricsEndpoint = "localhost:9090"

defaultSpansTable = "jaeger_spans_local"
defaultSpansIndexTable = "jaeger_index_local"
defaultOperationsTable = "jaeger_operations_local"
)

type Configuration struct {
Expand All @@ -37,6 +41,12 @@ type Configuration struct {
Database string `yaml:"database"`
// Endpoint for scraping prometheus metrics e.g. localhost:9090.
MetricsEndpoint string `yaml:"metrics_endpoint"`
// Table with spans. Default "jaeger_spans_local".
SpansTable string `yaml:"spans_table"`
// Span index table. Default "jaeger_index_local".
SpansIndexTable string `yaml:"spans_index_table"`
// Operations table. Default "jaeger_operations_local.
OperationsTable string `yaml:"operations_table"`
}

func (cfg *Configuration) setDefaults() {
Expand All @@ -58,4 +68,13 @@ func (cfg *Configuration) setDefaults() {
if cfg.MetricsEndpoint == "" {
cfg.MetricsEndpoint = defaultMetricsEndpoint
}
if cfg.SpansTable == "" {
cfg.SpansTable = defaultSpansTable
}
if cfg.SpansIndexTable == "" {
cfg.SpansIndexTable = defaultSpansIndexTable
}
if cfg.OperationsTable == "" {
cfg.OperationsTable = defaultOperationsTable
}
}
4 changes: 2 additions & 2 deletions storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ func NewStore(logger hclog.Logger, cfg Configuration, embeddedSQLScripts embed.F
}
return &Store{
db: db,
writer: clickhousespanstore.NewSpanWriter(logger, db, "jaeger_index", "jaeger_spans", clickhousespanstore.Encoding(cfg.Encoding), cfg.BatchFlushInterval, cfg.BatchWriteSize),
reader: clickhousespanstore.NewTraceReader(db, "jaeger_operations", "jaeger_index", "jaeger_spans"),
writer: clickhousespanstore.NewSpanWriter(logger, db, cfg.SpansIndexTable, cfg.SpansTable, clickhousespanstore.Encoding(cfg.Encoding), cfg.BatchFlushInterval, cfg.BatchWriteSize),
reader: clickhousespanstore.NewTraceReader(db, cfg.OperationsTable, cfg.SpansIndexTable, cfg.SpansTable),
archiveWriter: clickhousespanstore.NewSpanWriter(logger, db, "", "jaeger_archive_spans", clickhousespanstore.Encoding(cfg.Encoding), cfg.BatchFlushInterval, cfg.BatchWriteSize),
archiveReader: clickhousespanstore.NewTraceReader(db, "", "", "jaeger_archive_spans"),
}, nil
Expand Down

0 comments on commit d67bbb6

Please sign in to comment.