Skip to content

Commit

Permalink
Add session pool (#229)
Browse files Browse the repository at this point in the history
* temp

* Add session pool

* Add example and test

* Add multi-thread test

Update README

Fix test

* Handle auth failure

* Handle bad service

* Handle auth error
  • Loading branch information
Aiee authored Oct 20, 2022
1 parent 5254be2 commit 25eb787
Show file tree
Hide file tree
Showing 7 changed files with 846 additions and 3 deletions.
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,16 @@ with connection_pool.session_context('root', 'nebula') as session:
connection_pool.close()
```

## Example of using session pool
```
There are some limitations while using the session pool:
1. There MUST be an existing space in the DB before initializing the session pool.
2. Each session pool is corresponding to a single USER and a single Space. This is to ensure that the user's access control is consistent. i.g. The same user may have different access privileges in different spaces. If you need to run queries in different spaces, you may have multiple session pools.
3. Every time when sessinPool.execute() is called, the session will execute the query in the space set in the session pool config.
4. Commands that alter passwords or drop users should NOT be executed via session pool.
```
see /example/SessinPoolExample.py
## Quick example to fetch result to dataframe

```python
Expand Down
79 changes: 79 additions & 0 deletions example/SessinPoolExample.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
#!/usr/bin/env python
# --coding:utf-8--

# Copyright (c) 2022 vesoft inc. All rights reserved.
#
# This source code is licensed under Apache 2.0 License.


import time
from nebula3.common.ttypes import ErrorCode

from nebula3.gclient.net import Connection
from nebula3.gclient.net.SessionPool import SessionPool
from nebula3.Config import SessionPoolConfig
from nebula3.common import *
from FormatResp import print_resp

if __name__ == '__main__':
ip = '127.0.0.1'
port = 3699

try:
config = SessionPoolConfig()

# prepare space
conn = Connection()
conn.open(ip, port, 1000)
auth_result = conn.authenticate('root', 'nebula')
assert auth_result.get_session_id() != 0
resp = conn.execute(
auth_result._session_id,
'CREATE SPACE IF NOT EXISTS session_pool_test(vid_type=FIXED_STRING(30))',
)
assert resp.error_code == ErrorCode.SUCCEEDED
# insert data need to sleep after create schema
time.sleep(10)

# init session pool
session_pool = SessionPool('root', 'nebula', 'session_pool_test', [(ip, port)])
assert session_pool.init(config)

# add schema
resp = session_pool.execute(
'CREATE TAG IF NOT EXISTS person(name string, age int);'
'CREATE EDGE like (likeness double);'
)
# insert vertex
resp = session_pool.execute(
'INSERT VERTEX person(name, age) VALUES "Bob":("Bob", 10), "Lily":("Lily", 9)'
)
assert resp.is_succeeded(), resp.error_msg()

# insert edges
resp = session_pool.execute(
'INSERT EDGE like(likeness) VALUES "Bob"->"Lily":(80.0);'
)
assert resp.is_succeeded(), resp.error_msg()

resp = session_pool.execute('FETCH PROP ON person "Bob" YIELD vertex as node')
assert resp.is_succeeded(), resp.error_msg()
print_resp(resp)

resp = session_pool.execute('FETCH PROP ON like "Bob"->"Lily" YIELD edge as e')
assert resp.is_succeeded(), resp.error_msg()
print_resp(resp)

# drop space
conn.execute(
auth_result._session_id,
'DROP SPACE session_pool_test',
)

print("Example finished")

except Exception as x:
import traceback

print(traceback.format_exc())
exit(1)
16 changes: 16 additions & 0 deletions nebula3/Config.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,19 @@ class SSL_config(object):
keyfile = None
certfile = None
allow_weak_ssl_versions = False


class SessionPoolConfig(object):
"""The configs for the session pool
@ timeout(int): the timeout of the session
@ idle_time(int): the idle time of the session
@ max_size(int): the max size of the session
@ min_size(int): the min size of the session
@ interval_check(int): the interval to check the idle time of the session
"""

timeout = 0
idle_time = 0
max_size = 30
min_size = 1
interval_check = -1
6 changes: 6 additions & 0 deletions nebula3/Exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ def __init__(self):
self.message = 'No extra connection'


class NoValidSessionException(Exception):
def __init__(self, message):
Exception.__init__(self, message)
self.message = 'Failed to get a valid session from the pool: {}'.format(message)


class InValidHostname(Exception):
def __init__(self, message):
Exception.__init__(self, message)
Expand Down
37 changes: 34 additions & 3 deletions nebula3/gclient/net/Session.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@ def __init__(self, connection, auth_result: AuthResult, pool, retry_connect=True
self._timezone_offset = auth_result.get_timezone_offset()
self._connection = connection
self._timezone = 0
# connection the where the session was created, if session pool was used
self._pool = pool
self._retry_connect = retry_connect
# the time stamp when the session was added to the idle list of the session pool
self._idle_time_start = 0

def execute_parameter(self, stmt, params):
"""execute statement
Expand All @@ -33,7 +36,7 @@ def execute_parameter(self, stmt, params):
:return: ResultSet
"""
if self._connection is None:
raise RuntimeError('The session has released')
raise RuntimeError('The session has been released')
try:
start_time = time.time()
resp = self._connection.execute_parameter(self._session_id, stmt, params)
Expand Down Expand Up @@ -199,7 +202,7 @@ def execute_json_with_parameter(self, stmt, params):
:return: JSON string
"""
if self._connection is None:
raise RuntimeError('The session has released')
raise RuntimeError('The session has been released')
try:
resp_json = self._connection.execute_json_with_parameter(
self._session_id, stmt, params
Expand Down Expand Up @@ -234,14 +237,27 @@ def release(self):
self._connection = None

def ping(self):
"""check the connection is ok
"""ping at connection level check the connection is valid
:return: True or False
"""
if self._connection is None:
return False
return self._connection.ping()

def ping_session(self):
"""ping at session level, check whether the session is usable"""
resp = self.execute(r'RETURN "NEBULA PYTHON SESSION PING"')
if resp.is_succeeded():
return True
else:
logger.error(
'failed to ping the session: error code:{}, error message:{}'.format(
resp.error_code, resp.error_msg
)
)
return False

def _reconnect(self):
try:
self._connection.is_used = False
Expand All @@ -255,3 +271,18 @@ def _reconnect(self):

def __del__(self):
self.release()

def _idle_time(self):
"""get idletime of connection
:return: idletime
"""
if self.is_used:
return 0
return (time.time() - self.start_use_time) * 1000

def _sign_out(self):
"""sign out the session"""
if self._connection is None:
raise RuntimeError('The session has been released')
self._connection.signout(self._session_id)
Loading

0 comments on commit 25eb787

Please sign in to comment.