Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch too large #90

Closed
Codebreaker101 opened this issue Nov 16, 2021 · 6 comments
Closed

Batch too large #90

Codebreaker101 opened this issue Nov 16, 2021 · 6 comments

Comments

@Codebreaker101
Copy link

What version of Scylla or Cassandra are you using?

Scylla 4.4.6-0.20211028.dd018d4de

What version of Gocql are you using?

v1.5.0

What version of Go are you using?

1.17.1 linux/amd64

What did you do?

I watched a couple of talks about inserting 1 milion metrics a second in cassandra/scylla. I'm trying to replicate the result in go and have hit a wall. In the mentioned talks it is said that with the settings that I have set I would be able have 10000 queries in one batch. After altering demo code to use just one partition the warning goes away. KairosDB, that is mentioned in the talk, is creating batches based on their destination node/shard. To my knowledge this is not available in gocql driver (#75 ) unlike DataStax driver for java.

Did I miss anything? Is it possible to push 1 million request per second with go?

What did you expect to see?

I expected to be able to push 10000 queries in one batch into scylla

What did you see instead?

scylladb-scylla1-1  | WARN  2021-11-16 15:43:49,980 [shard 0] BatchStatement - Batch modifying 602 partitions in test.data_points is of size 153510 bytes, exceeding specified WARN threshold of 152576 by 934.
scylladb-scylla1-1  | ERROR 2021-11-16 15:56:14,120 [shard 0] BatchStatement - Batch modifying 603 partitions in test.data_points is of size 153765 bytes, exceeding specified FAIL threshold of 153600 by 165.

Steps to Replicate

scylla.yaml

num_tokens: 256
commitlog_sync: periodic
commitlog_sync_period_in_ms: 10000
commitlog_segment_size_in_mb: 32
seed_provider:
  - class_name: org.apache.cassandra.locator.SimpleSeedProvider
    parameters:
      - seeds: "127.0.0.1"
listen_address: localhost
native_transport_port: 9042
native_shard_aware_transport_port: 19042
read_request_timeout_in_ms: 5000
write_request_timeout_in_ms: 2000
cas_contention_timeout_in_ms: 1000
endpoint_snitch: SimpleSnitch
rpc_address: localhost
rpc_port: 9160
api_port: 10000
api_address: 127.0.0.1
batch_size_warn_threshold_in_kb: 149
batch_size_fail_threshold_in_kb: 150
partitioner: org.apache.cassandra.dht.Murmur3Partitioner
commitlog_total_space_in_mb: -1
murmur3_partitioner_ignore_msb_bits: 12
api_ui_dir: /opt/scylladb/swagger-ui/dist/
api_doc_dir: /opt/scylladb/api/api-doc/
native_transport_max_threads: 2000

docker-compose.yaml

version: "3"

services:
  scylla1:
    image: scylladb/scylla
    command: --seeds=scylla1,scylla2 --smp 1 --memory 2048M --overprovisioned 1 --api-address 0.0.0.0
    ports:
      - 9042:9042
      - 19042:19042
    volumes:
      - "./scylla/scylla.yaml:/etc/scylla/scylla.yaml"
    networks:
      web:

  scylla2:
    image: scylladb/scylla
    command: --seeds=scylla1,scylla2 --smp 1 --memory 2048M --overprovisioned 1 --api-address 0.0.0.0
    ports:
      - 9043:9042
      - 19043:19042
    volumes:
      - "./scylla/scylla.yaml:/etc/scylla/scylla.yaml"
    networks:
      web:

  scylla3:
    image: scylladb/scylla
    command: --seeds=scylla1,scylla2 --smp 1 --memory 2048M --overprovisioned 1 --api-address 0.0.0.0
    ports:
      - 9044:9042
      - 19044:19042
    volumes:
      - "./scylla/scylla.yaml:/etc/scylla/scylla.yaml"
    networks:
      web:

networks:
  web:
    driver: bridge

GO

import (
	"context"
	"fmt"
	"math/rand"
	"testing"
	"time"

	"github.com/gocql/gocql"
	"github.com/gocql/gocql/lz4"
)

/*
// go.mod
replace github.com/gocql/gocql => github.com/scylladb/gocql v1.5.0

// scylla.yaml
batch_size_warn_threshold_in_kb: 149
batch_size_fail_threshold_in_kb: 150
native_transport_max_threads: 2000

// cqlsh
CREATE KEYSPACE test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
CREATE TABLE IF NOT EXISTS data_points (
  metric text,
  row_time timestamp,
  data_type text,
  tags frozen<map<text,text>>,
  offset int,
  value blob,
  PRIMARY KEY ((metric, row_time, data_type, tags), offset)
);
*/
const ThreeWeeks = 1814400000 // in ms

type ScyllaConfig struct {
	Keyspace  string
	Hosts     []string
	BatchSize int
}

func TestBatchManual(t *testing.T) {
	config := ScyllaConfig{
		Keyspace:  "test",
		Hosts:     []string{"localhost:9042", "localhost:9043", "localhost:9044"},
		BatchSize: 602,
	}

	clusterConfig := gocql.NewCluster(config.Hosts...)
	clusterConfig.Keyspace = config.Keyspace
	clusterConfig.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(gocql.RoundRobinHostPolicy())
	clusterConfig.Compressor = lz4.LZ4Compressor{}
	session, err := gocql.NewSession(*clusterConfig)
	if err != nil {
		t.Fatal(err)
	}

	batch := session.NewBatch(gocql.UnloggedBatch)
	for i := 0; i < config.BatchSize; i++ {
		time := time.Now().UnixMilli()
		offset := time % ThreeWeeks
		rowTime := time - offset
                // different partitions
		batch.Query(
			"INSERT INTO data_points (metric,row_time,data_type,tags,offset,value) VALUES (?, ?, ?, ?, ?, ?);",
			"metric", rowTime, "int", map[string]string{"name": fmt.Sprintf("%d", i)}, offset, []byte{byte(i / 256), byte(i % 256)})
                // same partitions
                //batch.Query(
		//	"INSERT INTO data_points (metric,row_time,data_type,tags,offset,value) VALUES (?, ?, ?, ?, ?, ?);",
		//	"metric", rowTime, "int", map[string]string{"name": "value"}, offset+1000*int64(i), []byte{byte(i / 256), byte(i % 256)})
	}

	if err := session.ExecuteBatch(batch); err != nil {
		t.Fatal(err)
	}

}
@mmatczuk
Copy link

Two notes from my end.

  1. There is a stalled PR to add partitioned batches to upstream gocql Partitioned batches apache/cassandra-gocql-driver#1307
  2. From Scylla end the code to get shard from token is not exported but it exists https://github.com/scylladb/gocql/blob/master/scylla.go#L313

@Codebreaker101
Copy link
Author

@mmatczuk Thanks for the replay.

I found a way to get the host from the query:

stmt, args := buildQuery(metric)
query := scylla.session.Query(stmt, args...)
hostID := scylla.selector.Pick(query)().Info().HostID()

And then I batch according to the host.
I'm guessing one could squeeze more performance when batching by shard but I guess that will wait for a better time.

Screenshot from 2021-11-18 09-23-06

NOTE: Consistency ONE, Replication Factor 1

@mmatczuk
Copy link

Cool.

@mmatczuk
Copy link

I opened an issue in gocqlx I invite you to contribute the code to do the splitting to dbutil package under gocqlx.

@martin-sucha
Copy link

Nice!

I was thinking whether using ShuffleHosts or similar could cause problems, but even if the host selection policy picks a non-primary replica, the Scylla node should still have all the data to service the batch locally.

Still, I'd like to make the host/shard available to users with proper API somehow - it would help with implementing host selection policies outside the gocql package as well.

@dkropachev
Copy link
Collaborator

original issue is solved.
shard api issue - #200

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants