Skip to content

Commit

Permalink
support parameter(variable) (vesoft-inc#167)
Browse files Browse the repository at this point in the history
  • Loading branch information
czpmango committed Dec 23, 2021
1 parent ad04cc9 commit ec117b9
Show file tree
Hide file tree
Showing 6 changed files with 1,210 additions and 55 deletions.
118 changes: 118 additions & 0 deletions basic_example/parameter_example.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

package main

import (
"fmt"
"strings"
"sync"

nebulago "github.com/vesoft-inc/nebula-go/v2"
nebula "github.com/vesoft-inc/nebula-go/v2/nebula"
)

const (
address = "127.0.0.1"
// The default port of Nebula Graph 2.x is 9669.
// 3699 is only for testing.
port = 3699
username = "root"
password = "nebula"
)

// Initialize logger
var log = nebulago.DefaultLogger{}

func main() {
hostAddress := nebulago.HostAddress{Host: address, Port: port}
hostList := []nebulago.HostAddress{hostAddress}
// Create configs for connection pool using default values
testPoolConfig := nebulago.GetDefaultConf()

// Initialize connection pool
pool, err := nebulago.NewConnectionPool(hostList, testPoolConfig, log)
if err != nil {
log.Fatal(fmt.Sprintf("Fail to initialize the connection pool, host: %s, port: %d, %s", address, port, err.Error()))
}
// Close all connections in the pool
defer pool.Close()
// Create session and send query in go routine
var wg sync.WaitGroup
wg.Add(1)
go func(wg *sync.WaitGroup) {
defer wg.Done()
// Create session
session, err := pool.GetSession(username, password)
if err != nil {
log.Fatal(fmt.Sprintf("Fail to create a new session from connection pool, username: %s, password: %s, %s",
username, password, err.Error()))
}
// Release session and return connection back to connection pool
defer session.Release()

checkResultSet := func(prefix string, res *nebulago.ResultSet) {
if !res.IsSucceed() {
log.Fatal(fmt.Sprintf("%s, ErrorCode: %v, ErrorMsg: %s", prefix, res.GetErrorCode(), res.GetErrorMsg()))
}
}

var params map[string]*nebula.Value
params = make(map[string]*nebula.Value)

var bVal bool = true
var iVal int64 = 3
// bool
p1 := nebula.Value{BVal: &bVal}
// int
p2 := nebula.Value{IVal: &iVal}
// list
lSlice := []*nebula.Value{&p1,&p2}
var lVal nebula.NList
lVal.Values = lSlice
p3 := nebula.Value{LVal: &lVal}
// map
var nmap map[string]*nebula.Value = map[string]*nebula.Value{"a": &p1, "b": &p2}
var mVal nebula.NMap
mVal.Kvs = nmap
p4 := nebula.Value{MVal: &mVal}

params["p1"] = &p1
params["p2"] = &p2
params["p3"] = &p3
params["p4"] = &p4


// Extract data from the resultSet
{
query := "RETURN abs($p2)+1 AS col1, toBoolean($p1) and false AS col2, $p3, $p4.a"
// Send query
// resultSet, err := session.ExecuteWithParameter(query, params)
resultSet, err := session.ExecuteWithParameter(query,params)
if err != nil {
fmt.Print(err.Error())
return
}
checkResultSet(query, resultSet)

// Get all column names from the resultSet
colNames := resultSet.GetColNames()
fmt.Printf("Column names: %s\n", strings.Join(colNames, ", "))
fmt.Print(resultSet.AsStringTable())
// Get a row from resultSet
record, err := resultSet.GetRowValuesByIndex(0)
if err != nil {
log.Error(err.Error())
}
// Print whole row
fmt.Printf("The first row elements: %s\n", record.String())
}
}(&wg)
wg.Wait()

fmt.Print("\n")
log.Info("Nebula Go Client Gorountines Example Finished")
}
103 changes: 102 additions & 1 deletion client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ var nebulaLog = DefaultLogger{}
// Create default configs
var testPoolConfig = GetDefaultConf()

var params map[string]*nebula.Value

// Before run `go test -v`, you should start a nebula server listening on 3699 port.
// Using docker-compose is the easiest way and you can reference this file:
// https://github.com/vesoft-inc/nebula/blob/master/docker/docker-compose.yaml
Expand Down Expand Up @@ -506,7 +508,6 @@ func TestServiceDataIO(t *testing.T) {
}
assert.Equal(t, int8(sessionCreatedTime.Hour()), localTime.GetHour())
}

dropSpace(t, session, "client_test")
}

Expand Down Expand Up @@ -960,6 +961,73 @@ func TestExecuteJson(t *testing.T) {
}
}

func TestExecuteWithParameter(t *testing.T) {
hostList := []HostAddress{{Host: address, Port: port}}

testPoolConfig = PoolConfig{
TimeOut: 0 * time.Millisecond,
IdleTime: 0 * time.Millisecond,
MaxConnPoolSize: 10,
MinConnPoolSize: 1,
}

// Initialize connection pool
pool, err := NewConnectionPool(hostList, testPoolConfig, nebulaLog)
if err != nil {
t.Fatalf("fail to initialize the connection pool, host: %s, port: %d, %s", address, port, err.Error())
}
// close all connections in the pool
defer pool.Close()

// Create session
session, err := pool.GetSession(username, password)
if err != nil {
t.Fatalf("fail to create a new session from connection pool, username: %s, password: %s, %s",
username, password, err.Error())
}
defer session.Release()

// Create schemas
createTestDataSchema(t, session)
// Load data
loadTestData(t, session)
// p1:true p2:3 p3:[true,3] p4:{"a":true,"b":"Bob"}
prepareParameter()
// Complex result
{
resp, err := tryToExecuteWithParameter(session, "MATCH (v:person {name: $p4.b}) WHERE v.age>$p2-3 and $p1==true RETURN v ORDER BY $p3[0] LIMIT $p2", params)
if err != nil {
t.Fatalf(err.Error())
return
}
assert.Equal(t, 1, resp.GetRowSize())
record, err := resp.GetRowValuesByIndex(0)
if err != nil {
t.Fatalf(err.Error())
return
}
valWrap, err := record.GetValueByIndex(0)
if err != nil {
t.Fatalf(err.Error())
return
}
node, err := valWrap.AsNode()
if err != nil {
t.Fatalf(err.Error())
return
}
assert.Equal(t,
"(\"Bob\" :student{name: \"Bob\"} "+
":person{age: 10, birthday: 2010-09-10T10:08:02.000000, book_num: 100, "+
"child_name: \"Hello Worl\", expend: 100.0, "+
"first_out_city: 1111, friends: 10, grade: 3, "+
"hobby: __NULL__, is_girl: false, "+
"morning: 07:10:00.000000, name: \"Bob\", "+
"property: 1000.0, start_school: 2017-09-10})",
node.String())
}
}

func TestReconnect(t *testing.T) {
hostList := poolAddress

Expand Down Expand Up @@ -1073,6 +1141,17 @@ func tryToExecute(session *Session, query string) (resp *ResultSet, err error) {
return
}

func tryToExecuteWithParameter(session *Session, query string, params map[string]*nebula.Value) (resp *ResultSet, err error) {
for i := 3; i > 0; i-- {
resp, err = session.ExecuteWithParameter(query, params)
if err == nil && resp.IsSucceed() {
return
}
time.Sleep(2 * time.Second)
}
return
}

// creates schema
func createTestDataSchema(t *testing.T, session *Session) {
createSchema := "CREATE SPACE IF NOT EXISTS test_data(vid_type = FIXED_STRING(30));" +
Expand Down Expand Up @@ -1163,6 +1242,28 @@ func loadTestData(t *testing.T, session *Session) {
checkResultSet(t, query, resultSet)
}

func prepareParameter() {
// p1:true p2:3 p3:[true,3] p4:{"a":true,"b":"Bob"}
params = make(map[string]*nebula.Value)
var bVal bool = true
var iVal int64 = 3
p1 := nebula.Value{BVal: &bVal}
p2 := nebula.Value{IVal: &iVal}
p5 := nebula.Value{SVal: []byte("Bob")}
lSlice := []*nebula.Value{&p1, &p2}
var lVal nebula.NList
lVal.Values = lSlice
p3 := nebula.Value{LVal: &lVal}
var nmap map[string]*nebula.Value = map[string]*nebula.Value{"a": &p1, "b": &p5}
var mVal nebula.NMap
mVal.Kvs = nmap
p4 := nebula.Value{MVal: &mVal}
params["p1"] = &p1
params["p2"] = &p2
params["p3"] = &p3
params["p4"] = &p4
}

func dropSpace(t *testing.T, session *Session, spaceName string) {
query := fmt.Sprintf("DROP SPACE IF EXISTS %s;", spaceName)
resultSet, err := tryToExecute(session, query)
Expand Down
23 changes: 23 additions & 0 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,23 @@ func (cn *connection) execute(sessionID int64, stmt string) (*graph.ExecutionRes
return resp, err
}

func (cn *connection) executeWithParameter(sessionID int64, stmt string, params map[string]*nebula.Value) (*graph.ExecutionResponse, error) {
resp, err := cn.graph.ExecuteWithParameter(sessionID, []byte(stmt), params)
if err != nil {
// reopen the connection if timeout
if _, ok := err.(thrift.TransportException); ok {
if err.(thrift.TransportException).TypeID() == thrift.TIMED_OUT {
reopenErr := cn.reopen()
if reopenErr != nil {
return nil, reopenErr
}
return cn.graph.ExecuteWithParameter(sessionID, []byte(stmt), params)
}
}
}

return resp, err
}
func (cn *connection) executeJson(sessionID int64, stmt string) ([]byte, error) {
jsonResp, err := cn.graph.ExecuteJson(sessionID, []byte(stmt))
if err != nil {
Expand All @@ -151,6 +168,12 @@ func (cn *connection) ping() bool {
return err == nil
}

// Check connection to host address
func (cn *connection) pingWithParameter() bool {
_, err := cn.executeWithParameter(0, "YIELD 1", nil)
return err == nil
}

// Sign out and release seesin ID
func (cn *connection) signOut(sessionID int64) error {
// Release session ID to graphd
Expand Down
Loading

0 comments on commit ec117b9

Please sign in to comment.