Skip to content

Commit

Permalink
feat: support schemaless with table name key
Browse files Browse the repository at this point in the history
  • Loading branch information
huskar-t committed Sep 5, 2024
1 parent 224fac6 commit 643a70d
Show file tree
Hide file tree
Showing 17 changed files with 74 additions and 48 deletions.
14 changes: 8 additions & 6 deletions controller/ws/schemaless/schemaless.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,11 +346,12 @@ func (t *TaosSchemaless) connect(ctx context.Context, session *melody.Session, r
}

type schemalessWriteReq struct {
ReqID uint64 `json:"req_id"`
Protocol int `json:"protocol"`
Precision string `json:"precision"`
TTL int `json:"ttl"`
Data string `json:"data"`
ReqID uint64 `json:"req_id"`
Protocol int `json:"protocol"`
Precision string `json:"precision"`
TTL int `json:"ttl"`
Data string `json:"data"`
TableNameKey string `json:"table_name_key"`
}

type schemalessResp struct {
Expand Down Expand Up @@ -387,13 +388,14 @@ func (t *TaosSchemaless) insert(ctx context.Context, session *melody.Session, re
var err error
var totalRows int32
var affectedRows int
totalRows, result = syncinterface.TaosSchemalessInsertRawTTLWithReqID(
totalRows, result = syncinterface.TaosSchemalessInsertRawTTLWithReqIDTBNameKey(
t.conn,
req.Data,
req.Protocol,
req.Precision,
req.TTL,
int64(req.ReqID),
req.TableNameKey,
logger,
isDebug,
)
Expand Down
13 changes: 7 additions & 6 deletions controller/ws/ws/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -722,11 +722,12 @@ func (h *messageHandler) handleFreeResult(_ context.Context, request Request, lo
}

type SchemalessWriteRequest struct {
ReqID uint64 `json:"req_id"`
Protocol int `json:"protocol"`
Precision string `json:"precision"`
TTL int `json:"ttl"`
Data string `json:"data"`
ReqID uint64 `json:"req_id"`
Protocol int `json:"protocol"`
Precision string `json:"precision"`
TTL int `json:"ttl"`
Data string `json:"data"`
TableNameKey string `json:"table_name_key"`
}

type SchemalessWriteResponse struct {
Expand All @@ -748,7 +749,7 @@ func (h *messageHandler) handleSchemalessWrite(_ context.Context, request Reques
}
var totalRows int32
var affectedRows int
totalRows, result := syncinterface.TaosSchemalessInsertRawTTLWithReqID(h.conn, req.Data, req.Protocol, req.Precision, req.TTL, int64(request.ReqID), logger, isDebug)
totalRows, result := syncinterface.TaosSchemalessInsertRawTTLWithReqIDTBNameKey(h.conn, req.Data, req.Protocol, req.Precision, req.TTL, int64(request.ReqID), req.TableNameKey, logger, isDebug)
logger.Tracef("total_rows:%d, result:%p", totalRows, result)
defer syncinterface.FreeResult(result, logger, isDebug)
affectedRows = wrapper.TaosAffectedRows(result)
Expand Down
20 changes: 20 additions & 0 deletions db/syncinterface/wrapper.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
package syncinterface

/*
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <taos.h>
*/

import "C"
import (
"database/sql/driver"
"strings"
Expand Down Expand Up @@ -324,3 +332,15 @@ func TaosStmtGetParam(stmt unsafe.Pointer, index int, logger *logrus.Entry, isDe
thread.Unlock()
return dataType, dataLength, err
}

func TaosSchemalessInsertRawTTLWithReqIDTBNameKey(conn unsafe.Pointer, lines string, protocol int, precision string, ttl int, reqID int64, tbNameKey string, logger *logrus.Entry, isDebug bool) (int32, unsafe.Pointer) {
logger.Tracef("call taos_schemaless_insert_raw_ttl_with_reqid_tbname_key, conn:%p, lines:%s, protocol:%d, precision:%s, ttl:%d, reqID:%d, tbnameKey ", conn, lines, protocol, precision, ttl, reqID)
s := log.GetLogNow(isDebug)
thread.Lock()
logger.Debugf("get thread lock for taos_schemaless_insert_raw_ttl_with_reqid_tbname_key cost:%s", log.GetLogDuration(isDebug, s))
s = log.GetLogNow(isDebug)
rows, result := wrapper.TaosSchemalessInsertRawTTLWithReqIDTBNameKey(conn, lines, protocol, precision, ttl, reqID, tbNameKey)
logger.Debugf("taos_schemaless_insert_raw_ttl_with_reqid_tbname_key finish, rows:%d, result:%p, cost:%s", rows, result, log.GetLogDuration(isDebug, s))
thread.Unlock()
return rows, result
}
6 changes: 3 additions & 3 deletions db/syncinterface/wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ func TestTaosSelectDB(t *testing.T) {
assert.NotEqual(t, 0, code)
}

func TestTaosSchemalessInsertRawTTLWithReqID(t *testing.T) {
func TestTaosSchemalessInsertRawTTLWithReqIDTBNameKey(t *testing.T) {
reqID := generator.GetReqID()
var logger = logger.WithField("test", "TaosSchemalessInsertRawTTLWithReqID").WithField(config.ReqIDKey, reqID)
var logger = logger.WithField("test", "TestTaosSchemalessInsertRawTTLWithReqIDTBNameKey").WithField(config.ReqIDKey, reqID)
conn, err := TaosConnect("", "root", "taosdata", "", 0, logger, isDebug)
if !assert.NoError(t, err) {
return
Expand All @@ -69,7 +69,7 @@ func TestTaosSchemalessInsertRawTTLWithReqID(t *testing.T) {
}()
code := TaosSelectDB(conn, "syncinterface_test_sml", logger, isDebug)
assert.Equal(t, 0, code)
errCode, result := TaosSchemalessInsertRawTTLWithReqID(conn, "measurement,host=host1 field1=2i,field2=2.0 1577836800000000000", wrapper.InfluxDBLineProtocol, "", 0, reqID, logger, isDebug)
errCode, result := TaosSchemalessInsertRawTTLWithReqIDTBNameKey(conn, "measurement,host=host1 field1=2i,field2=2.0 1577836800000000000", wrapper.InfluxDBLineProtocol, "", 0, reqID, "", logger, isDebug)
assert.Equal(t, int32(1), errCode)
assert.NotNil(t, result)
FreeResult(result, logger, isDebug)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ require (
github.com/spf13/viper v1.14.0
github.com/stretchr/testify v1.9.0
github.com/swaggo/swag v1.8.8
github.com/taosdata/driver-go/v3 v3.5.1-0.20240829061127-2a7d73120e08
github.com/taosdata/driver-go/v3 v3.5.1-0.20240905075738-5d27ebcc3e79
github.com/taosdata/file-rotatelogs/v2 v2.5.0
go.uber.org/automaxprocs v1.5.1
golang.org/x/sync v0.1.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2609,8 +2609,8 @@ github.com/swaggo/swag v1.8.8/go.mod h1:ezQVUUhly8dludpVk+/PuwJWvLLanB13ygV5Pr9e
github.com/syndtr/gocapability v0.0.0-20170704070218-db04d3cc01c8/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww=
github.com/syndtr/gocapability v0.0.0-20180916011248-d98352740cb2/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww=
github.com/syndtr/gocapability v0.0.0-20200815063812-42c35b437635/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww=
github.com/taosdata/driver-go/v3 v3.5.1-0.20240829061127-2a7d73120e08 h1:57qVc5NrCbHwe25XjTnUrJobxQMJ2hWZz1l3MNfOxhA=
github.com/taosdata/driver-go/v3 v3.5.1-0.20240829061127-2a7d73120e08/go.mod h1:H2vo/At+rOPY1aMzUV9P49SVX7NlXb3LAbKw+MCLrmU=
github.com/taosdata/driver-go/v3 v3.5.1-0.20240905075738-5d27ebcc3e79 h1:HL1/cmYJz48nWnwwjw4DoCikgubW2DdxETN7IN9XWz0=
github.com/taosdata/driver-go/v3 v3.5.1-0.20240905075738-5d27ebcc3e79/go.mod h1:H2vo/At+rOPY1aMzUV9P49SVX7NlXb3LAbKw+MCLrmU=
github.com/taosdata/file-rotatelogs/v2 v2.5.0 h1:7iqDggodohXowoE9Y2CsUH0Esz9zGcoSfkI0cVU/z5E=
github.com/taosdata/file-rotatelogs/v2 v2.5.0/go.mod h1:Qm99Lh0iMZouGgyy++JgTqKvP5FQw1ruR5jkWF7e1n0=
github.com/taosdata/melody v0.0.0-20240407104517-11dcf4a47591 h1:JT7pgLJpQvmSGPAFVWJZG/bPyuAio0uY3cb7AKyfhGY=
Expand Down
2 changes: 1 addition & 1 deletion plugin/collectd/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (p *Plugin) HandleMetrics(serializer *influx.Serializer, clientIP net.IP, m
execLogger := logger.WithField(config.ReqIDKey, reqID)
execLogger.Debugf("insert lines, data:%s, db:%s, ttl:%d", data, p.conf.DB, p.conf.TTL)
start := log.GetLogNow(isDebug)
err = inserter.InsertInfluxdb(taosConn.TaosConnection, data, p.conf.DB, "ns", p.conf.TTL, uint64(reqID), execLogger)
err = inserter.InsertInfluxdb(taosConn.TaosConnection, data, p.conf.DB, "ns", p.conf.TTL, uint64(reqID), "", execLogger)
logger.Debugf("insert lines finish, cost:%s", log.GetLogDuration(isDebug, start))
if err != nil {
logger.Errorf("insert lines error, err:%s, data:%s", err, data)
Expand Down
4 changes: 2 additions & 2 deletions plugin/influxdb/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (p *Influxdb) write(c *gin.Context) {
return
}
}

tableNameKey := c.Query("table_name_key")
data, err := c.GetRawData()
if err != nil {
logger.Errorf("read line error, err:%s", err)
Expand Down Expand Up @@ -182,7 +182,7 @@ func (p *Influxdb) write(c *gin.Context) {
conn := taosConn.TaosConnection
s = log.GetLogNow(isDebug)
logger.Tracef("start insert influxdb, data:%s", data)
err = inserter.InsertInfluxdb(conn, data, db, precision, ttl, reqID, logger)
err = inserter.InsertInfluxdb(conn, data, db, precision, ttl, reqID, tableNameKey, logger)
logger.Debugf("finish insert influxdb, cost:%s", log.GetLogDuration(isDebug, s))
if err != nil {
logger.Errorf("insert line error, data:%s, err:%s", data, err)
Expand Down
2 changes: 1 addition & 1 deletion plugin/nodeexporter/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func (p *NodeExporter) requestSingle(conn unsafe.Pointer, req *Req) error {
}
reqID := generator.GetReqID()
execLogger := logger.WithField(common.ReqIDKey, reqID)
err = inserter.InsertInfluxdb(conn, data, p.conf.DB, "ns", p.conf.TTL, uint64(reqID), execLogger)
err = inserter.InsertInfluxdb(conn, data, p.conf.DB, "ns", p.conf.TTL, uint64(reqID), "", execLogger)
if err != nil {
logger.WithError(err).Error("insert influxdb error", string(data))
return err
Expand Down
13 changes: 8 additions & 5 deletions plugin/opentsdb/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ func (p *Plugin) insertJson(c *gin.Context) {
return
}
}

tableNameKey := c.Query("table_name_key")
logger.Tracef("request table_name_key:%s", tableNameKey)
s := log.GetLogNow(isDebug)
taosConn, err := commonpool.GetConnection(user, password, iptool.GetRealIP(c.Request))
logger.Debugf("get connection finish, cost:%s", log.GetLogDuration(isDebug, s))
Expand All @@ -147,8 +148,8 @@ func (p *Plugin) insertJson(c *gin.Context) {
}
}()
s = log.GetLogNow(isDebug)
logger.Debugf("insert json payload, data:%s, db:%s, ttl:%d,", data, db, ttl)
err = inserter.InsertOpentsdbJson(taosConn.TaosConnection, data, db, ttl, reqID, logger)
logger.Debugf("insert json payload, data:%s, db:%s, ttl:%d, table_name_key:%s", data, db, ttl, tableNameKey)
err = inserter.InsertOpentsdbJson(taosConn.TaosConnection, data, db, ttl, reqID, tableNameKey, logger)
logger.Debugf("insert json payload finish, cost:%s", log.GetLogDuration(isDebug, s))
if err != nil {
logger.Errorf("insert json payload error, err:%s, data:%s", err, data)
Expand Down Expand Up @@ -213,6 +214,8 @@ func (p *Plugin) insertTelnet(c *gin.Context) {
}
}

tableNameKey := c.Query("table_name_key")
logger.Tracef("request table_name_key:%s", tableNameKey)
rd := bufio.NewReader(c.Request.Body)
var lines []string
tmp := pool.BytesPoolGet()
Expand Down Expand Up @@ -261,8 +264,8 @@ func (p *Plugin) insertTelnet(c *gin.Context) {
}
}()
s = log.GetLogNow(isDebug)
logger.Debugf("insert telnet payload, lines:%v, db:%s, ttl:%d", lines, db, ttl)
err = inserter.InsertOpentsdbTelnetBatch(taosConn.TaosConnection, lines, db, ttl, reqID, logger)
logger.Debugf("insert telnet payload, lines:%v, db:%s, ttl:%d, table_name_key: %s", lines, db, ttl, tableNameKey)
err = inserter.InsertOpentsdbTelnetBatch(taosConn.TaosConnection, lines, db, ttl, reqID, tableNameKey, logger)
logger.Debugf("insert telnet payload finish, cost:%s", log.GetLogDuration(isDebug, s))
if err != nil {
logger.Errorf("insert telnet payload error, err:%s, lines:%v", err, lines)
Expand Down
2 changes: 1 addition & 1 deletion plugin/opentsdbtelnet/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ func (p *Plugin) handleData(connection *Connection, line []string, clientIP net.
reqID := generator.GetReqID()
logger := logger.WithField(config.ReqIDKey, reqID)
logger.Debugf("insert telnet payload, lines:%s", line)
err = inserter.InsertOpentsdbTelnetBatch(taosConn.TaosConnection, line, connection.db, p.conf.TTL, uint64(reqID), logger)
err = inserter.InsertOpentsdbTelnetBatch(taosConn.TaosConnection, line, connection.db, p.conf.TTL, uint64(reqID), "", logger)
if err != nil {
logger.WithError(err).Errorln("insert telnet payload error :", line)
}
Expand Down
2 changes: 1 addition & 1 deletion plugin/statsd/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (p *Plugin) HandleMetrics(serializer *influx.Serializer, metric telegraf.Me
reqID := generator.GetReqID()
execLogger := logger.WithField(config.ReqIDKey, reqID)
execLogger.Debugf("insert line,req_id:0x%x,data: %s", reqID, string(data))
err = inserter.InsertInfluxdb(taosConn.TaosConnection, data, p.conf.DB, "ns", p.conf.TTL, uint64(reqID), execLogger)
err = inserter.InsertInfluxdb(taosConn.TaosConnection, data, p.conf.DB, "ns", p.conf.TTL, uint64(reqID), "", execLogger)
execLogger.Debugf("insert line finish cost:%s", log.GetLogDuration(isDebug, start))
if err != nil {
execLogger.WithError(err).Errorln("insert lines error", string(data))
Expand Down
4 changes: 2 additions & 2 deletions schemaless/capi/influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/taosdata/taosadapter/v3/tools/generator"
)

func InsertInfluxdb(conn unsafe.Pointer, data []byte, db, precision string, ttl int, reqID int64, logger *logrus.Entry) error {
func InsertInfluxdb(conn unsafe.Pointer, data []byte, db, precision string, ttl int, reqID int64, tableNameKey string, logger *logrus.Entry) error {
if reqID == 0 {
reqID = generator.GetReqID()
}
Expand All @@ -26,7 +26,7 @@ func InsertInfluxdb(conn unsafe.Pointer, data []byte, db, precision string, ttl

var result unsafe.Pointer

_, result = syncinterface.TaosSchemalessInsertRawTTLWithReqID(conn, d, wrapper.InfluxDBLineProtocol, precision, ttl, reqID, logger, log.IsDebug())
_, result = syncinterface.TaosSchemalessInsertRawTTLWithReqIDTBNameKey(conn, d, wrapper.InfluxDBLineProtocol, precision, ttl, reqID, tableNameKey, logger, log.IsDebug())

defer func() {
syncinterface.FreeResult(result, logger, log.IsDebug())
Expand Down
2 changes: 1 addition & 1 deletion schemaless/capi/influxdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestInsertInfluxdb(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
logger := logrus.New().WithField("test", "TestInsertInfluxdb")
err := capi.InsertInfluxdb(tt.args.taosConnect, tt.args.data, tt.args.db, tt.args.precision, tt.args.ttl, 0, logger)
err := capi.InsertInfluxdb(tt.args.taosConnect, tt.args.data, tt.args.db, tt.args.precision, tt.args.ttl, 0, "", logger)
if (err != nil) != tt.wantErr {
t.Errorf("InsertInfluxdb() error = %v, wantErr %v", err, tt.wantErr)
return
Expand Down
12 changes: 6 additions & 6 deletions schemaless/capi/opentsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/taosdata/taosadapter/v3/tools/generator"
)

func InsertOpentsdbJson(conn unsafe.Pointer, data []byte, db string, ttl int, reqID int64, logger *logrus.Entry) error {
func InsertOpentsdbJson(conn unsafe.Pointer, data []byte, db string, ttl int, reqID int64, tableNameKey string, logger *logrus.Entry) error {
if len(data) == 0 {
return nil
}
Expand All @@ -22,8 +22,8 @@ func InsertOpentsdbJson(conn unsafe.Pointer, data []byte, db string, ttl int, re
}

var result unsafe.Pointer
_, result = syncinterface.TaosSchemalessInsertRawTTLWithReqID(conn, string(data), wrapper.OpenTSDBJsonFormatProtocol,
"", ttl, getReqID(reqID), logger, log.IsDebug())
_, result = syncinterface.TaosSchemalessInsertRawTTLWithReqIDTBNameKey(conn, string(data), wrapper.OpenTSDBJsonFormatProtocol,
"", ttl, getReqID(reqID), tableNameKey, logger, log.IsDebug())

defer func() {
syncinterface.FreeResult(result, logger, log.IsDebug())
Expand All @@ -34,7 +34,7 @@ func InsertOpentsdbJson(conn unsafe.Pointer, data []byte, db string, ttl int, re
return nil
}

func InsertOpentsdbTelnet(conn unsafe.Pointer, data []string, db string, ttl int, reqID int64, logger *logrus.Entry) error {
func InsertOpentsdbTelnet(conn unsafe.Pointer, data []string, db string, ttl int, reqID int64, tableNameKey string, logger *logrus.Entry) error {
trimData := make([]string, 0, len(data))
for i := 0; i < len(data); i++ {
if len(data[i]) == 0 {
Expand All @@ -50,8 +50,8 @@ func InsertOpentsdbTelnet(conn unsafe.Pointer, data []string, db string, ttl int
}

var result unsafe.Pointer
_, result = syncinterface.TaosSchemalessInsertRawTTLWithReqID(conn, strings.Join(trimData, "\n"),
wrapper.OpenTSDBTelnetLineProtocol, "", ttl, getReqID(reqID), logger, log.IsDebug())
_, result = syncinterface.TaosSchemalessInsertRawTTLWithReqIDTBNameKey(conn, strings.Join(trimData, "\n"),
wrapper.OpenTSDBTelnetLineProtocol, "", ttl, getReqID(reqID), tableNameKey, logger, log.IsDebug())
defer func() {
syncinterface.FreeResult(result, logger, log.IsDebug())
}()
Expand Down
8 changes: 4 additions & 4 deletions schemaless/capi/opentsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func TestInsertOpentsdbTelnet(t *testing.T) {
logger := logrus.New().WithField("test", "TestInsertOpentsdbTelnet")
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if err := capi.InsertOpentsdbTelnet(tt.args.taosConnect, []string{tt.args.data}, tt.args.db, tt.args.ttl, 0, logger); (err != nil) != tt.wantErr {
if err := capi.InsertOpentsdbTelnet(tt.args.taosConnect, []string{tt.args.data}, tt.args.db, tt.args.ttl, 0, "", logger); (err != nil) != tt.wantErr {
t.Errorf("InsertOpentsdbTelnet() error = %v, wantErr %v", err, tt.wantErr)
}
})
Expand Down Expand Up @@ -125,7 +125,7 @@ func BenchmarkTelnet(b *testing.B) {
//`sys.if.bytes.out`,`host`=web01,`interface`=eth0
//t_98df8453856519710bfc2f1b5f8202cf
//t_98df8453856519710bfc2f1b5f8202cf
err := capi.InsertOpentsdbTelnet(conn, []string{`put sys.if.bytes.out 1479496100 1.3E3 host=web01 interface=eth0`}, "test", 0, 0, logger)
err := capi.InsertOpentsdbTelnet(conn, []string{`put sys.if.bytes.out 1479496100 1.3E3 host=web01 interface=eth0`}, "test", 0, 0, "", logger)
if err != nil {
b.Error(err)
}
Expand Down Expand Up @@ -237,7 +237,7 @@ func TestInsertOpentsdbJson(t *testing.T) {
logger := logrus.New().WithField("test", "TestInsertOpentsdbJson")
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if err := capi.InsertOpentsdbJson(tt.args.taosConnect, tt.args.data, tt.args.db, tt.args.ttl, 0, logger); (err != nil) != tt.wantErr {
if err := capi.InsertOpentsdbJson(tt.args.taosConnect, tt.args.data, tt.args.db, tt.args.ttl, 0, "", logger); (err != nil) != tt.wantErr {
t.Errorf("InsertOpentsdbJson() error = %v, wantErr %v", err, tt.wantErr)
}
})
Expand Down Expand Up @@ -295,7 +295,7 @@ func TestInsertOpentsdbTelnetBatch(t *testing.T) {
logger := logrus.New().WithField("test", "TestInsertOpentsdbTelnetBatch")
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if err := capi.InsertOpentsdbTelnet(tt.args.taosConnect, tt.args.data, tt.args.db, tt.args.ttl, 0, logger); (err != nil) != tt.wantErr {
if err := capi.InsertOpentsdbTelnet(tt.args.taosConnect, tt.args.data, tt.args.db, tt.args.ttl, 0, "", logger); (err != nil) != tt.wantErr {
t.Errorf("InsertOpentsdbTelnet() error = %v, wantErr %v", err, tt.wantErr)
}
})
Expand Down
12 changes: 6 additions & 6 deletions schemaless/inserter/c.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,16 @@ import (
"github.com/taosdata/taosadapter/v3/tools/generator"
)

func InsertInfluxdb(taosConnect unsafe.Pointer, data []byte, db, precision string, ttl int, reqID uint64, logger *logrus.Entry) error {
return capi.InsertInfluxdb(taosConnect, data, db, precision, ttl, getReqID(reqID), logger)
func InsertInfluxdb(taosConnect unsafe.Pointer, data []byte, db, precision string, ttl int, reqID uint64, tableNameKey string, logger *logrus.Entry) error {
return capi.InsertInfluxdb(taosConnect, data, db, precision, ttl, getReqID(reqID), tableNameKey, logger)
}

func InsertOpentsdbJson(taosConnect unsafe.Pointer, data []byte, db string, ttl int, reqID uint64, logger *logrus.Entry) error {
return capi.InsertOpentsdbJson(taosConnect, data, db, ttl, getReqID(reqID), logger)
func InsertOpentsdbJson(taosConnect unsafe.Pointer, data []byte, db string, ttl int, reqID uint64, tableNameKey string, logger *logrus.Entry) error {
return capi.InsertOpentsdbJson(taosConnect, data, db, ttl, getReqID(reqID), tableNameKey, logger)
}

func InsertOpentsdbTelnetBatch(taosConnect unsafe.Pointer, data []string, db string, ttl int, reqID uint64, logger *logrus.Entry) error {
return capi.InsertOpentsdbTelnet(taosConnect, data, db, ttl, getReqID(reqID), logger)
func InsertOpentsdbTelnetBatch(taosConnect unsafe.Pointer, data []string, db string, ttl int, reqID uint64, tableNameKey string, logger *logrus.Entry) error {
return capi.InsertOpentsdbTelnet(taosConnect, data, db, ttl, getReqID(reqID), tableNameKey, logger)
}

func getReqID(id uint64) int64 {
Expand Down

0 comments on commit 643a70d

Please sign in to comment.