Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add workload routing connection property #508

Merged
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ with vertica_python.connect(**conn_info) as connection:
| use_prepared_statements | See [Passing parameters to SQL queries](#passing-parameters-to-sql-queries). <br>**_Default_**: False |
| dsn | See [Set Properties with Connection String](#set-properties-with-connection-string). |
| request_complex_types | See [SQL Data conversion to Python objects](#sql-data-conversion-to-python-objects). <br>**_Default_**: True |
| workload | Sets the workload name associated with this session. Valid values are workload names that already exist in a workload routing rule on the server. If a workload name that doesn't exist is entered, the server will reject it and it will be set to the default. <br>**_Default_**: "" |


Below are a few important connection topics you may deal with, or you can skip and jump to the next section: [Send Queries and Retrieve Results](#send-queries-and-retrieve-results)
Expand Down
4 changes: 2 additions & 2 deletions vertica_python/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@
version_info = (1, 3, 2)
__version__ = '.'.join(map(str, version_info))

# The protocol version (3.14) implemented in this library.
PROTOCOL_VERSION = 3 << 16 | 14
# The protocol version (3.15) implemented in this library.
PROTOCOL_VERSION = 3 << 16 | 15

apilevel = 2.0
threadsafety = 1 # Threads may share the module, but not connections!
Expand Down
23 changes: 23 additions & 0 deletions vertica_python/tests/integration_tests/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ def tearDown(self):
del self._conn_info['session_label']
if 'autocommit' in self._conn_info:
del self._conn_info['autocommit']
if 'workload' in self._conn_info:
del self._conn_info['workload']

def test_client_os_user_name_metadata(self):
value = getpass.getuser()
Expand Down Expand Up @@ -106,5 +108,26 @@ def test_autocommit_off(self):
# Set with attribute setter
conn.autocommit = True
self.assertTrue(conn.autocommit)

def test_workload_default(self):
self.require_protocol_at_least(3 << 16 | 15)
with self._connect() as conn:
query = "SHOW WORKLOAD"
res = self._query_and_fetchone(query)
self.assertEqual(res[0], '')

def test_workload_set_property(self):
self.require_protocol_at_least(3 << 16 | 15)
self._conn_info['workload'] = 'python_test_workload'
with self._connect() as conn:
# we use dc_client_server_messages to test that the client is working properly.
# We do not regularly test on a multi subcluster database and the server will reject this
# workload from the startup packet, returning a parameter status message with an empty string.
query = ("SELECT contents FROM dc_client_server_messages"
" WHERE session_id = current_session()"
" AND message_type = '^+'"
" AND contents LIKE '%workload%'")
res = self._query_and_fetchone(query)
self.assertEqual(res[0], 'workload: python_test_workload')

exec(ConnectionTestCase.createPrepStmtClass())
4 changes: 3 additions & 1 deletion vertica_python/tests/unit_tests/test_parsedsn.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,13 @@ def test_str_arguments(self):
dsn = ('vertica://john:pwd@localhost:5433/db1?'
'session_label=vpclient&unicode_error=strict&'
'log_path=/home/admin/vClient.log&log_level=DEBUG&'
'workload=python_test_workload&'
'kerberos_service_name=krb_service&kerberos_host_name=krb_host')
expected = {'database': 'db1', 'host': 'localhost', 'user': 'john',
'password': 'pwd', 'port': 5433, 'log_level': 'DEBUG',
'session_label': 'vpclient', 'unicode_error': 'strict',
'log_path': '/home/admin/vClient.log',
'log_path': '/home/admin/vClient.log',
'workload': 'python_test_workload',
'kerberos_service_name': 'krb_service',
'kerberos_host_name': 'krb_host'}
parsed = parse_dsn(dsn)
Expand Down
7 changes: 6 additions & 1 deletion vertica_python/vertica/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
DEFAULT_LOG_PATH = 'vertica_python.log'
DEFAULT_BINARY_TRANSFER = False
DEFAULT_REQUEST_COMPLEX_TYPES = True
DEFAULT_WORKLOAD = ''
try:
DEFAULT_USER = getpass.getuser()
except Exception as e:
Expand Down Expand Up @@ -291,6 +292,7 @@ def __init__(self, options=None):
self.options.setdefault('autocommit', DEFAULT_AUTOCOMMIT)
self.options.setdefault('session_label', _generate_session_label())
self.options.setdefault('backup_server_node', DEFAULT_BACKUP_SERVER_NODE)
self.options.setdefault('workload', DEFAULT_WORKLOAD)
self.options.setdefault('kerberos_service_name', DEFAULT_KRB_SERVICE_NAME)
# Kerberos authentication hostname defaults to the host value here so
# the correct value cannot be overwritten by load balancing or failover
Expand Down Expand Up @@ -332,6 +334,7 @@ def __init__(self, options=None):
# Complex types metadata is returned since protocol version 3.12
self.complex_types_enabled = self.parameters['protocol_version'] >= (3 << 16 | 12) and \
self.parameters.get('request_complex_types', 'off') == 'on'

self._logger.info('Connection is ready')

#############################################
Expand Down Expand Up @@ -823,8 +826,10 @@ def startup_connection(self):
autocommit = self.options['autocommit']
binary_transfer = self.options['binary_transfer']
request_complex_types = self.options['request_complex_types']
workload = self.options['workload']

self.write(messages.Startup(user, database, session_label, os_user_name, autocommit, binary_transfer, request_complex_types))
self.write(messages.Startup(user, database, session_label, os_user_name, autocommit, binary_transfer,
request_complex_types, workload))

while True:
message = self.read_message()
Expand Down
3 changes: 2 additions & 1 deletion vertica_python/vertica/messages/frontend_messages/startup.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class Startup(BulkFrontendMessage):
message_id = None

def __init__(self, user, database, session_label, os_user_name, autocommit,
binary_transfer, request_complex_types):
binary_transfer, request_complex_types, workload):
BulkFrontendMessage.__init__(self)

try:
Expand Down Expand Up @@ -95,6 +95,7 @@ def __init__(self, user, database, session_label, os_user_name, autocommit,
b'binary_data_protocol': '1' if binary_transfer else '0', # Defaults to text format '0'
b'protocol_features': '{"request_complex_types":' + request_complex_types + '}',
b'protocol_compat': 'VER',
b'workload': workload,
Copy link
Member

@sitingren sitingren May 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need protocol version increase?
Your test calls self.require_protocol_at_least(3 << 16 | 15), but the request version in vertica_python/__init__.py is still 3.14.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually since we are just querying the dc client server messages table it won't require a protocol version increase. I'll remove this

Copy link
Member

@sitingren sitingren May 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't remove require_protocol_at_least(). Querying the dc client server messages table need to have server greater than a certain version, so that dc_client_server_messages table is defined.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also requires "SHOW WORKLOAD" which I've implemented on the server in version 23.3. This test can't pass until we have a Vertica image available for 23.3 which is not yet released so it will be a while.

}

def read_bytes(self):
Expand Down