Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate ZMQ to GNMI to improve Dash GNMI API performance. #135

Merged
merged 7 commits into from
Jul 21, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions gnmi_server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type Config struct {
UserAuth AuthTypes
EnableTranslibWrite bool
EnableNativeWrite bool
ZmqAddress string
IdleConnDuration int
}

Expand Down Expand Up @@ -343,7 +344,7 @@ func (s *Server) Get(ctx context.Context, req *gnmipb.GetRequest) (*gnmipb.GetRe
return nil, err
}
if check := IsNativeOrigin(origin); check {
dc, err = sdc.NewMixedDbClient(paths, prefix, origin)
dc, err = sdc.NewMixedDbClient(paths, prefix, origin, s.config.ZmqAddress)
} else {
dc, err = sdc.NewTranslClient(prefix, paths, ctx, extensions)
}
Expand Down Expand Up @@ -410,7 +411,7 @@ func (s *Server) Set(ctx context.Context, req *gnmipb.SetRequest) (*gnmipb.SetRe
common_utils.IncCounter(common_utils.GNMI_SET_FAIL)
return nil, grpc.Errorf(codes.Unimplemented, "GNMI native write is disabled")
}
dc, err = sdc.NewMixedDbClient(paths, prefix, origin)
dc, err = sdc.NewMixedDbClient(paths, prefix, origin, s.config.ZmqAddress)
} else {
if s.config.EnableTranslibWrite == false {
common_utils.IncCounter(common_utils.GNMI_SET_FAIL)
Expand Down Expand Up @@ -485,7 +486,7 @@ func (s *Server) Capabilities(ctx context.Context, req *gnmipb.CapabilityRequest
var supportedModels []gnmipb.ModelData
dc, _ := sdc.NewTranslClient(nil, nil, ctx, extensions)
supportedModels = append(supportedModels, dc.Capabilities()...)
dc, _ = sdc.NewMixedDbClient(nil, nil, "")
dc, _ = sdc.NewMixedDbClient(nil, nil, "", s.config.ZmqAddress)
supportedModels = append(supportedModels, dc.Capabilities()...)

suppModels := make([]*gnmipb.ModelData, len(supportedModels))
Expand Down
94 changes: 94 additions & 0 deletions sonic_data_client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@ import (
"errors"
"testing"
"os"
"time"
"reflect"
"io/ioutil"
"encoding/json"
"fmt"

"github.com/jipanyang/gnxi/utils/xpath"
"github.com/sonic-net/sonic-gnmi/swsscommon"
gnmipb "github.com/openconfig/gnmi/proto/gnmi"
)

Expand Down Expand Up @@ -349,3 +352,94 @@ func TestNonDbClientGetError(t *testing.T) {
t.Errorf("Expected error from NonDbClient.Get, got nil")
}
}

/*
Helper method for receive data from ZmqConsumerStateTable
consumer: Receive data from consumer
return:
true: data received
false: not receive any data after retry
*/
func ReceiveFromZmq(consumer swsscommon.ZmqConsumerStateTable) (bool) {
liuh-80 marked this conversation as resolved.
Show resolved Hide resolved
receivedData := swsscommon.NewKeyOpFieldsValuesQueue()
retry := 0;
for {
// sender's ZMQ may disconnect, wait and retry for reconnect
time.Sleep(time.Duration(1000) * time.Millisecond)
consumer.Pops(receivedData)
if receivedData.Size() == 0 {
retry++
if retry >= 10 {
return false
}
} else {
return true
}
}
}

func TestZmqReconnect(t *testing.T) {
// create ZMQ server
db := swsscommon.NewDBConnector(APPL_DB_NAME, SWSS_TIMEOUT, false)
zmqServer := swsscommon.NewZmqServer("tcp://*:1234")
var TEST_TABLE string = "DASH_ROUTE"
consumer := swsscommon.NewZmqConsumerStateTable(db, TEST_TABLE, zmqServer)

// create ZMQ client side
zmqAddress := "tcp://127.0.0.1:1234"
client := MixedDbClient {
applDB : swsscommon.NewDBConnector(APPL_DB_NAME, SWSS_TIMEOUT, false),
tableMap : map[string]swsscommon.ProducerStateTable{},
zmqClient : swsscommon.NewZmqClient(zmqAddress),
}

data := map[string]string{}
var TEST_KEY string = "TestKey"
client.DbSetTable(TEST_TABLE, TEST_KEY, data)
if !ReceiveFromZmq(consumer) {
t.Errorf("Receive data from ZMQ failed")
}

// recreate ZMQ server to trigger re-connect
swsscommon.DeleteZmqConsumerStateTable(consumer)
swsscommon.DeleteZmqServer(zmqServer)
zmqServer = swsscommon.NewZmqServer("tcp://*:1234")
consumer = swsscommon.NewZmqConsumerStateTable(db, TEST_TABLE, zmqServer)

// send data again, client will reconnect
client.DbSetTable(TEST_TABLE, TEST_KEY, data)
if !ReceiveFromZmq(consumer) {
t.Errorf("Receive data from ZMQ failed")
}
}

func TestRetryHelper(t *testing.T) {
// create ZMQ server
zmqServer := swsscommon.NewZmqServer("tcp://*:2234")

// create ZMQ client side
zmqAddress := "tcp://127.0.0.1:2234"
zmqClient := swsscommon.NewZmqClient(zmqAddress)
returnError := true
exeCount := 0
RetryHelper(
zmqClient,
func () (err error) {
exeCount++
if returnError {
returnError = false
return fmt.Errorf("connection_reset")
}
return nil
})

if exeCount == 1 {
t.Errorf("RetryHelper does not retry")
}

if exeCount > 2 {
t.Errorf("RetryHelper retry too much")
}

swsscommon.DeleteZmqServer(zmqServer)
}
126 changes: 107 additions & 19 deletions sonic_data_client/mixed_db_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,12 @@ import (

const REDIS_SOCK string = "/var/run/redis/redis.sock"
const APPL_DB int = 0
const APPL_DB_NAME string = "APPL_DB"
const DASH_TABLE_PREFIX string = "DASH_"
const SWSS_TIMEOUT uint = 0
const MAX_RETRY_COUNT uint = 5
const RETYRY_DELAY_MILLISECOND uint = 100
zbud-msft marked this conversation as resolved.
Show resolved Hide resolved
const RETYRY_DELAY_FACTOR uint = 2
const CHECK_POINT_PATH string = "/etc/sonic"

const (
Expand Down Expand Up @@ -59,13 +64,37 @@ type MixedDbClient struct {
workPath string
jClient *JsonClient
applDB swsscommon.DBConnector
zmqClient swsscommon.ZmqClient
tableMap map[string]swsscommon.ProducerStateTable

synced sync.WaitGroup // Control when to send gNMI sync_response
w *sync.WaitGroup // wait for all sub go routines to finish
mu sync.RWMutex // Mutex for data protection among routines for DbClient
}

var mixedDbClientMap = map[string]MixedDbClient{}

func getMixedDbClient(zmqAddress string) (MixedDbClient) {
client, ok := mixedDbClientMap[zmqAddress]
if !ok {
client = MixedDbClient {
applDB : swsscommon.NewDBConnector(APPL_DB_NAME, SWSS_TIMEOUT, false),
tableMap : map[string]swsscommon.ProducerStateTable{},
}

// enable ZMQ by zmqAddress parameter
if zmqAddress != "" {
client.zmqClient = swsscommon.NewZmqClient(zmqAddress)
} else {
client.zmqClient = nil
}

mixedDbClientMap[zmqAddress] = client
}

return client
}

func parseJson(str []byte) (interface{}, error) {
var res interface{}
err := json.Unmarshal(str, &res)
Expand Down Expand Up @@ -98,42 +127,107 @@ func ParseTarget(target string, paths []*gnmipb.Path) (string, error) {
return target, nil
}

func (c *MixedDbClient) DbSetTable(table string, key string, values map[string]string) error {
func (c *MixedDbClient) GetTable(table string) (swsscommon.ProducerStateTable) {
pt, ok := c.tableMap[table]
if !ok {
pt = swsscommon.NewProducerStateTable(c.applDB, table)
if strings.HasPrefix(table, DASH_TABLE_PREFIX) && c.zmqClient != nil {
log.V(2).Infof("Create ZmqProducerStateTable: %s", table)
pt = swsscommon.NewZmqProducerStateTable(c.applDB, table, c.zmqClient)
} else {
log.V(2).Infof("Create ProducerStateTable: %s", table)
pt = swsscommon.NewProducerStateTable(c.applDB, table)
}

c.tableMap[table] = pt
}

return pt
}

func CatchException(err *error) {
if r := recover(); r != nil {
*err = fmt.Errorf("%v", r)
}
}

func ProducerStateTableSetWrapper(pt swsscommon.ProducerStateTable, key string, value swsscommon.FieldValuePairs) (err error) {
// convert panic to error
defer CatchException(&err)
pt.Set(key, value, "SET", "")
return
}

func ProducerStateTableDeleteWrapper(pt swsscommon.ProducerStateTable, key string) (err error) {
// convert panic to error
defer CatchException(&err)
pt.Delete(key, "DEL", "")
return
}

type ActionNeedRetry func() error

func RetryHelper(zmqClient swsscommon.ZmqClient, action ActionNeedRetry) {
var retry uint = 0
var retry_delay = time.Duration(RETYRY_DELAY_MILLISECOND) * time.Millisecond
zbud-msft marked this conversation as resolved.
Show resolved Hide resolved
ConnectionResetErr := "connection_reset"
for {
err := action()
if err != nil {
if (err.Error() == ConnectionResetErr && retry <= MAX_RETRY_COUNT) {
log.V(6).Infof("RetryHelper: connection reset, reconnect and retry later")
time.Sleep(retry_delay)

zmqClient.Connect()
retry_delay *= time.Duration(RETYRY_DELAY_FACTOR)
retry++
continue
}

panic(err)
}

return
}
}

func (c *MixedDbClient) DbSetTable(table string, key string, values map[string]string) error {
vec := swsscommon.NewFieldValuePairs()
defer swsscommon.DeleteFieldValuePairs(vec)
for k, v := range values {
pair := swsscommon.NewFieldValuePair(k, v)
vec.Add(pair)
swsscommon.DeleteFieldValuePair(pair)
}
pt.Set(key, vec, "SET", "")

pt := c.GetTable(table)
RetryHelper(
c.zmqClient,
func () error {
return ProducerStateTableSetWrapper(pt, key, vec)
})
return nil
}

func (c *MixedDbClient) DbDelTable(table string, key string) error {
pt, ok := c.tableMap[table]
if !ok {
pt = swsscommon.NewProducerStateTable(c.applDB, table)
c.tableMap[table] = pt
}
pt.Delete(key, "DEL", "")
pt := c.GetTable(table)
RetryHelper(
c.zmqClient,
func () error {
return ProducerStateTableDeleteWrapper(pt, key)
})

return nil
}

func NewMixedDbClient(paths []*gnmipb.Path, prefix *gnmipb.Path, origin string) (Client, error) {
var client MixedDbClient
func NewMixedDbClient(paths []*gnmipb.Path, prefix *gnmipb.Path, origin string, zmqAddress string) (Client, error) {
var err error

// Testing program may ask to use redis local tcp connection
if UseRedisLocalTcpPort {
useRedisTcpClient()
}

var client = getMixedDbClient(zmqAddress)
client.prefix = prefix
client.target = ""
client.origin = origin
Expand All @@ -159,8 +253,6 @@ func NewMixedDbClient(paths []*gnmipb.Path, prefix *gnmipb.Path, origin string)
}
client.paths = paths
client.workPath = common_utils.GNMI_WORK_PATH
client.applDB = swsscommon.NewDBConnector(APPL_DB, REDIS_SOCK, SWSS_TIMEOUT)
client.tableMap = map[string]swsscommon.ProducerStateTable{}

return &client, nil
}
Expand Down Expand Up @@ -1059,16 +1151,12 @@ func (c *MixedDbClient) Capabilities() []gnmipb.ModelData {
}

func (c *MixedDbClient) Close() error {
for _, pt := range c.tableMap {
swsscommon.DeleteProducerStateTable(pt)
}
swsscommon.DeleteDBConnector(c.applDB)
// Do nothing here, because MixedDbClient will be cache in mixedDbClientMap and reuse
return nil
}

func (c *MixedDbClient) SentOne(val *Value) {
}

func (c *MixedDbClient) FailedSend() {
}

}
2 changes: 2 additions & 0 deletions telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ var (
caCert = flag.String("ca_crt", "", "CA certificate for client certificate validation. Optional.")
serverCert = flag.String("server_crt", "", "TLS server certificate")
serverKey = flag.String("server_key", "", "TLS server private key")
zmqAddress = flag.String("zmq_address", "", "Orchagent ZMQ address, when not set or empty string telemetry server will switch to Redis based communication channel.")
insecure = flag.Bool("insecure", false, "Skip providing TLS cert and key, for testing only!")
noTLS = flag.Bool("noTLS", false, "disable TLS, for testing only!")
allowNoClientCert = flag.Bool("allow_no_client_auth", false, "When set, telemetry server will request but not require a client certificate.")
Expand Down Expand Up @@ -81,6 +82,7 @@ func main() {
cfg.EnableTranslibWrite = bool(*gnmi_translib_write)
cfg.EnableNativeWrite = bool(*gnmi_native_write)
cfg.LogLevel = 3
cfg.ZmqAddress = *zmqAddress
cfg.Threshold = int(*threshold)
cfg.IdleConnDuration = int(*idle_conn_duration)
var opts []grpc.ServerOption
Expand Down