Skip to content

sdojjy/tikv-coprocessor-client

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

8 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

tikv-coprocessor-client

This client is based on client-go, it's designed to test tikv coprocessor directly.

Feature

  • Insert table record and index record into tikv using tikv grpc interface.
  • Send coprocessor request to tikv and parse the response.

Init the client

To init the coprocessor client we need the pd address.

client, err := coprocessor.NewClient([]string{"127.0.0.1:2379"}, config.Security{})
if err != nil {
	t.Fatal("create client failed")
}

Insert data to tikv

Before you insert the data, you need a table id and index id, you can assign it manually or get it by calling CopClient.GetTableInfo method Examples see: record_test

Send Coprocessor Request

Check comment in cop file, that file defined a lot methods you can call to send coprocessor request. Example: see cop_test

Examples

  • Insert data to a table with index.
package main

import (
	"github.com/pingcap/tidb/config"
	"github.com/pingcap/tidb/types"
	"github.com/prometheus/common/log"
	"github.com/sdojjy/tikv-coprocessor-client/coprocessor"
)

func main() {
	client, err := coprocessor.NewClient([]string{"127.0.0.1:2379"}, config.Security{})
	if err != nil {
		log.Fatal(err)
	}
	defer client.Close()
	//create table t2 (id int , name varchar(25), key `name` (`name`));
	tableInfo, _ := client.GetTableInfo("test", "t2")
	rawData := []types.Datum{types.NewIntDatum(1), types.NewStringDatum("jerry")}
	log.Info(client.AddTableRecord(tableInfo.ID, 1, rawData))

	indexData := []types.Datum{types.NewStringDatum("jerry")}
	log.Info(client.AddIndexRecord(tableInfo.ID, tableInfo.Indices[0].ID, 1, indexData, false))
	//result
	/**
	  mysql>  select name from t2 where name in ('jerry','ttt');
	  +-------+
	  | name  |
	  +-------+
	  | jerry |
	  +-------+
	*/
}
  • Send table scan with conditions
package main

import (
	"context"
	"fmt"
	"github.com/pingcap/tidb/config"
	"github.com/pingcap/tidb/types"
	"github.com/prometheus/common/log"
	"github.com/sdojjy/tikv-coprocessor-client/coprocessor"
)

func main() {
	c, err := coprocessor.NewClient([]string{"127.0.0.1:2379"}, config.Security{})
	if err != nil {
		log.Fatal(err)
	}
	defer c.Close()
	//create table t2 (id int , name varchar(25), key `name` (`name`));
	tableInfo, _ := c.GetTableInfo("test", "t2")
	ret, _ := c.ScanTableWithConditionsAndTableInfo(context.Background(), tableInfo)
	printDatum(ret)
	fmt.Println("mock table info scan")
	ret, _ = c.ScanTableWithConditions(context.Background(), coprocessor.InnerTableInfoToMockTableInfo(tableInfo), "id!=1")
	printDatum(ret)
}

func printDatum(values [][]types.Datum) {
	for _, row := range values {
		for _, col := range row {
			str, _ := col.ToString()
			fmt.Printf("%s\t", str)
		}
		fmt.Println()
	}
}
  • Send Coprocessor executors
    ... 
	agg, err := c.GenAggExprPB(ast.AggFuncMax, []expression.Expression{col}, false)
	if err != nil {
		log.Fatal(err)
	}

	returnTypes := []*types.FieldType{types.NewFieldType(mysql.TypeFloat), types.NewFieldType(mysql.TypeVarchar)}
	executors := []*tipb.Executor{
		NewTableScanExecutorWithTypes(tableInfo.ID, tableInfo.Types, false),
		NewSelectionScanExecutor([]*tipb.Expr{expr}),
		NewAggregationExecutor([]*tipb.Expr{agg}, []*tipb.Expr{c.GetGroupByPB(groupById)}),
		NewLimitExecutor(2),
	}
	...
	c.SendCoprocessorRequest(context.Background(), tableInfo.ID, returnTypes, executors, rangeFunc, decodeTableRow)

About

A tikv client for testing tikv coprocessor

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages