Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix fbthrift timeout bug #126

Merged
merged 1 commit into from
Aug 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions nebula2/Exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ class IOErrorException(Exception):
E_UNKNOWN = 0
E_ALL_BROKEN = 1
E_CONNECT_BROKEN = 2
E_TIMEOUT = 3
E_NOT_OPEN = 4

def __init__(self, code=E_UNKNOWN, message=None):
Exception.__init__(self, message)
Expand Down
52 changes: 33 additions & 19 deletions nebula2/gclient/net/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,11 @@ def execute(self, stmt):
if not self._reconnect():
logging.warning('Retry connect failed')
raise IOErrorException(IOErrorException.E_ALL_BROKEN, ie.message)
try:
resp = self._connection.execute(self._session_id, stmt)
end_time = time.time()
return ResultSet(resp,
all_latency=int((end_time - start_time) * 1000000),
timezone_offset=self._timezone_offset)
except Exception:
raise
resp = self._connection.execute(self._session_id, stmt)
end_time = time.time()
return ResultSet(resp,
all_latency=int((end_time - start_time) * 1000000),
timezone_offset=self._timezone_offset)
raise
except Exception:
raise
Expand All @@ -117,6 +114,7 @@ def ping(self):

def _reconnect(self):
try:
self._connection.is_used = False
conn = self._pool.get_connection()
if conn is None:
return False
Expand Down Expand Up @@ -273,8 +271,6 @@ def get_connection(self):
return None
except Exception as ex:
logging.error('Get connection failed: {}'.format(ex))
import traceback
print(traceback.format_exc())
return None

def ping(self, address):
Expand Down Expand Up @@ -392,6 +388,7 @@ def __init__(self):
self.start_use_time = time.time()
self._ip = None
self._port = None
self._timeout = 0

def open(self, ip, port, timeout):
"""open the connection
Expand All @@ -403,6 +400,7 @@ def open(self, ip, port, timeout):
"""
self._ip = ip
self._port = port
self._timeout = timeout
try:
s = TSocket.TSocket(self._ip, self._port)
if timeout > 0:
Expand All @@ -414,6 +412,14 @@ def open(self, ip, port, timeout):
except Exception:
raise

def _reopen(self):
"""reopen the connection

:return:
"""
self.close()
self.open(self._ip, self._port, self._timeout)

def authenticate(self, user_name, password):
"""authenticate to graphd

Expand All @@ -427,6 +433,8 @@ def authenticate(self, user_name, password):
raise AuthFailedException(resp.error_msg)
return AuthResult(resp.session_id, resp.time_zone_offset_seconds, resp.time_zone_name)
except TTransportException as te:
if te.message.find("timed out"):
self._reopen()
if te.type == TTransportException.END_OF_FILE:
self.close()
raise IOErrorException(IOErrorException.E_CONNECT_BROKEN, te.message)
Expand All @@ -441,10 +449,18 @@ def execute(self, session_id, stmt):
try:
resp = self._connection.execute(session_id, stmt)
return resp
except TTransportException as te:
if te.type == TTransportException.END_OF_FILE:
self.close()
raise IOErrorException(IOErrorException.E_CONNECT_BROKEN, te.message)
except Exception as te:
if isinstance(te, TTransportException):
if te.message.find("timed out") > 0:
self._reopen()
raise IOErrorException(IOErrorException.E_TIMEOUT, te.message)
elif te.type == TTransportException.END_OF_FILE:
raise IOErrorException(IOErrorException.E_CONNECT_BROKEN, te.message)
elif te.type == TTransportException.NOT_OPEN:
raise IOErrorException(IOErrorException.E_NOT_OPEN, te.message)
else:
raise IOErrorException(IOErrorException.E_UNKNOWN, te.message);
raise

def signout(self, session_id):
"""tells the graphd can release the session info
Expand Down Expand Up @@ -473,12 +489,10 @@ def ping(self):
:return: True or False
"""
try:
self._connection.execute(0, 'YIELD 1;')
resp = self._connection.execute(0, 'YIELD 1;')
return True
except TTransportException as te:
if te.type == TTransportException.END_OF_FILE:
return False
return True
except Exception:
return False

def reset(self):
"""reset the idletime
Expand Down
21 changes: 21 additions & 0 deletions tests/test_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import os
import threading
import time
import pytest

current_dir = os.path.dirname(os.path.abspath(__file__))
root_dir = os.path.join(current_dir, '..')
Expand Down Expand Up @@ -142,6 +143,26 @@ def test_stop_close(self):
except Exception as e:
assert False, "We don't expect reach here:".format(e)

@pytest.mark.skip(reason="the test data without nba")
def test_timeout(self):
config = Config()
config.timeout = 1000
config.max_connection_pool_size = 1
pool = ConnectionPool()
assert pool.init([('127.0.0.1', 9669)], config)
session = pool.get_session('root', 'nebula')
try:
resp = session.execute('USE nba;GO 1000 STEPS FROM \"Tim Duncan\" OVER like')
assert False
except IOErrorException as e:
assert True
assert str(e).find("Read timed out")
session.release()
try:
session = pool.get_session('root', 'nebula')
except IOErrorException as e:
assert False


def test_multi_thread():
# Test multi thread
Expand Down