Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[trino][compute] Add compute support for Trino #3837

Merged
merged 1 commit into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 19 additions & 9 deletions apps/beeswax/src/beeswax/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@
"""
from __future__ import print_function

import numbers
import re
import time
import numbers

from django import forms

from beeswax.models import Namespace, Compute
from beeswax.models import Compute, Namespace

HIVE_IDENTIFER_REGEX = re.compile("(^[a-zA-Z0-9]\w*\.)?[a-zA-Z0-9]\w*$")
HIVE_IDENTIFER_REGEX = re.compile(r"(^[a-zA-Z0-9]\w*\.)?[a-zA-Z0-9]\w*$")

DL_FORMATS = ['csv', 'xls']

Expand All @@ -44,7 +44,7 @@

RELATION_OPS = ['=', '<>', '<', '<=', '>', '>='] + RELATION_OPS_UNARY

COMPUTE_TYPES = ['hive-compute', 'impala-compute']
COMPUTE_TYPES = ['hive-compute', 'impala-compute', 'trino-compute']

TERMINATORS = [
# (hive representation, description, ascii value)
Expand All @@ -56,12 +56,13 @@
(' ', "Space", 32),
]


def timing(fn):
def decorator(*args, **kwargs):
time1 = time.time()
ret = fn(*args, **kwargs)
time2 = time.time()
print('%s elapsed time: %0.3f ms' % (fn.__name__, (time2-time1)*1000.0))
print('%s elapsed time: %0.3f ms' % (fn.__name__, (time2 - time1) * 1000.0))
return ret
return decorator

Expand All @@ -79,7 +80,8 @@ def apply_natural_sort(collection, key=None):
Applies a natural sort (http://rosettacode.org/wiki/Natural_sorting) to a list or dictionary
Dictionary types require a sort key to be specified
"""
to_digit = lambda i: int(i) if i.isdigit() else i
def to_digit(i):
return int(i) if i.isdigit() else i

def tokenize_and_convert(item, key=None):
if key:
Expand All @@ -94,7 +96,9 @@ def is_compute(cluster):
return False
connector = cluster.get('connector')
compute = cluster.get('compute')
compute_check = lambda x: x and x.get('type') in COMPUTE_TYPES

def compute_check(x):
return x and x.get('type') in COMPUTE_TYPES
return compute_check(cluster) or compute_check(connector) or compute_check(compute)


Expand All @@ -107,12 +111,16 @@ def is_compute(cluster):
3. Lookup namespace based on dialect from cluster or prpvided dialect
and return the first compute filtered by user-access. Needs valid user
'''


def find_compute(cluster=None, user=None, dialect=None, namespace_id=None):
if cluster:
# If we find a full/partial cluster object, we will attempt to load a compute
connector = cluster.get('connector')
compute = cluster.get('compute')
compute_check = lambda x: x and x.get('type') in COMPUTE_TYPES

def compute_check(x):
return x and x.get('type') in COMPUTE_TYPES

# Pick the most probable compute object
selected_compute = (cluster if compute_check(cluster)
Expand All @@ -135,7 +143,9 @@ def find_compute(cluster=None, user=None, dialect=None, namespace_id=None):
dialect = selected_compute['dialect'] if selected_compute.get('dialect') else dialect
if not dialect and cluster.get('type'):
t = cluster['type']
dialect = 'hive' if t.startswith('hive') else 'impala' if t.startswith('impala') else None
dialect = 'hive' if t.startswith('hive') else\
'impala' if t.startswith('impala') else\
'trino' if t.startswith('trino') else None

Comment on lines +146 to 149
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Is formatting correct?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pycharm likes it this way and ruff also doesn't complain so I didn't bother much. On a closer look, the L148 is actually part of the if started on L147, so an extra indent might be okay.

# We will attempt to find a default compute based on other criteria
ns = None
Expand Down
53 changes: 36 additions & 17 deletions apps/beeswax/src/beeswax/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,22 @@
# limitations under the License.

from __future__ import division
from builtins import str
import logging

import sys
import math
import logging
import os.path
import sys

from desktop.conf import default_ssl_cacerts, default_ssl_validate, AUTH_PASSWORD as DEFAULT_AUTH_PASSWORD,\
AUTH_USERNAME as DEFAULT_AUTH_USERNAME
from desktop.lib.conf import ConfigSection, Config, coerce_bool, coerce_csv, coerce_password_from_script
from builtins import str

if sys.version_info[0] > 2:
from django.utils.translation import gettext_lazy as _t, gettext as _
else:
from django.utils.translation import ugettext_lazy as _t, ugettext as _
from django.utils.translation import gettext as _, gettext_lazy as _t

from desktop.conf import (
AUTH_PASSWORD as DEFAULT_AUTH_PASSWORD,
AUTH_USERNAME as DEFAULT_AUTH_USERNAME,
default_ssl_cacerts,
default_ssl_validate,
)
from desktop.lib.conf import Config, ConfigSection, coerce_bool, coerce_csv, coerce_password_from_script

LOG = logging.getLogger()

Expand Down Expand Up @@ -103,21 +104,25 @@
"the fully-qualified domain name (FQDN) is required"),
default="localhost")


def get_hive_thrift_binary_port():
"""Devise port from core-site Thrift / execution mode & Http port"""
from beeswax.hive_site import hiveserver2_thrift_binary_port, get_hive_execution_mode # Cyclic dependency
from beeswax.hive_site import get_hive_execution_mode, hiveserver2_thrift_binary_port # Cyclic dependency
return hiveserver2_thrift_binary_port() or (10500 if (get_hive_execution_mode() or '').lower() == 'llap' else 10000)


HIVE_SERVER_PORT = Config(
key="hive_server_port",
help=_t("Configure the binary Thrift port for HiveServer2."),
dynamic_default=get_hive_thrift_binary_port,
type=int)


def get_hive_thrift_http_port():
"""Devise port from core-site Thrift / execution mode & Http port"""
from beeswax.hive_site import hiveserver2_thrift_http_port, get_hive_execution_mode # Cyclic dependency
return hiveserver2_thrift_http_port() or (10501 if (get_hive_execution_mode() or '').lower() == 'llap' else 10001)
from beeswax.hive_site import get_hive_execution_mode, hiveserver2_thrift_http_port # Cyclic dependency
return hiveserver2_thrift_http_port() or (10501 if (get_hive_execution_mode() or '').lower() == 'llap' else 10001)


HIVE_HTTP_THRIFT_PORT = Config(
key="hive_server_http_port",
Expand Down Expand Up @@ -165,15 +170,15 @@ def get_hive_thrift_http_port():
type=int,
help=_t('Timeout in seconds for zookeeper connection.'))

USE_GET_LOG_API = Config( # To remove in Hue 4
USE_GET_LOG_API = Config( # To remove in Hue 4
key='use_get_log_api',
default=False,
type=coerce_bool,
help=_t('Choose whether to use the old GetLog() Thrift call from before Hive 0.14 to retrieve the logs.'
'If false, use the FetchResults() Thrift call from Hive 1.0 or more instead.')
)

BROWSE_PARTITIONED_TABLE_LIMIT = Config( # Deprecated, to remove in Hue 4
BROWSE_PARTITIONED_TABLE_LIMIT = Config( # Deprecated, to remove in Hue 4
key='browse_partitioned_table_limit',
default=1000,
type=int,
Expand All @@ -187,10 +192,12 @@ def get_hive_thrift_http_port():
type=int,
help=_t('The maximum number of partitions that will be included in the SELECT * LIMIT sample query for partitioned tables.'))


def get_browse_partitioned_table_limit():
"""Get the old default"""
return BROWSE_PARTITIONED_TABLE_LIMIT.get()


LIST_PARTITIONS_LIMIT = Config(
key='list_partitions_limit',
dynamic_default=get_browse_partitioned_table_limit,
Expand All @@ -206,10 +213,12 @@ def get_browse_partitioned_table_limit():
'(e.g. - 10K rows * 1K columns = 10M cells.) '
'A value of -1 means there will be no limit.'))


def get_deprecated_download_cell_limit():
"""Get the old default"""
return math.floor(DOWNLOAD_CELL_LIMIT.get() / 100) if DOWNLOAD_CELL_LIMIT.get() > 0 else DOWNLOAD_CELL_LIMIT.get()


DOWNLOAD_ROW_LIMIT = Config(
key='download_row_limit',
dynamic_default=get_deprecated_download_cell_limit,
Expand Down Expand Up @@ -297,15 +306,18 @@ def get_deprecated_download_cell_limit():
)
)


def get_auth_username():
"""Get from top level default from desktop"""
return DEFAULT_AUTH_USERNAME.get()


AUTH_USERNAME = Config(
key="auth_username",
help=_t("Auth username of the hue user used for authentications."),
dynamic_default=get_auth_username)


def get_auth_password():
"""Get from script or backward compatibility"""
password = AUTH_PASSWORD_SCRIPT.get()
Expand All @@ -314,6 +326,7 @@ def get_auth_password():

return DEFAULT_AUTH_PASSWORD.get()


AUTH_PASSWORD = Config(
key="auth_password",
help=_t("LDAP/PAM/.. password of the hue user used for authentications."),
Expand All @@ -327,13 +340,15 @@ def get_auth_password():
type=coerce_password_from_script,
default=None)


def get_use_sasl_default():
"""Get from hive_site or backward compatibility"""
from beeswax.hive_site import get_hiveserver2_authentication, get_use_sasl # Cyclic dependency
use_sasl = get_use_sasl()
if use_sasl is not None:
return use_sasl.upper() == 'TRUE'
return get_hiveserver2_authentication() in ('KERBEROS', 'NONE', 'LDAP', 'PAM') # list for backward compatibility
return get_hiveserver2_authentication() in ('KERBEROS', 'NONE', 'LDAP', 'PAM') # list for backward compatibility


USE_SASL = Config(
key="use_sasl",
Expand All @@ -342,10 +357,12 @@ def get_use_sasl_default():
type=coerce_bool,
dynamic_default=get_use_sasl_default)


def has_multiple_sessions():
"""When true will create multiple sessions for user queries"""
return MAX_NUMBER_OF_SESSIONS.get() != 1


CLOSE_SESSIONS = Config(
key="close_sessions",
help=_t(
Expand All @@ -356,9 +373,11 @@ def has_multiple_sessions():
dynamic_default=has_multiple_sessions
)


def has_session_pool():
return has_multiple_sessions() and not CLOSE_SESSIONS.get()


MAX_CATALOG_SQL_ENTRIES = Config(
key="max_catalog_sql_entries",
help=_t(
Expand Down
6 changes: 6 additions & 0 deletions desktop/core/src/desktop/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -2258,6 +2258,12 @@ def has_connectors():
return ENABLE_CONNECTORS.get()


def is_cdw_compute_enabled():
'''When the computes feature is turned on'''
clusters = CLUSTERS.get()
return bool(clusters and [c for c in clusters.values() if c.TYPE.get() == 'cdw'])


CLUSTERS = UnspecifiedConfigSection(
"clusters",
help="One entry for each additional remote cluster Hue can interact with.",
Expand Down
54 changes: 52 additions & 2 deletions desktop/core/src/desktop/management/commands/sync_warehouses.py
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel at some point in time, having an API endpoint (or few of them) in Hue for this could be beneficial.

Maybe then we don't need to depend on CDW layer running the command at a set interval and service discovery can be more dynamic and on-demand?

Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

core_v1 = client.CoreV1Api()
apps_v1 = client.AppsV1Api()
networking_v1 = client.NetworkingV1Api()

SERVER_HELP = r"""
Sync up the desktop_connectors with the available hive and impala warehouses
Expand All @@ -64,6 +65,7 @@ def sync_warehouses(args, options):

hives = [c for c in computes if c['dialect'] == 'hive']
impalas = [c for c in computes if c['dialect'] == 'impala']
trinos = [c for c in computes if c['dialect'] == 'trino']

(hive_warehouse, created) = models.Namespace.objects.get_or_create(
external_id="CDW_HIVE_WAREHOUSE",
Expand All @@ -75,6 +77,11 @@ def sync_warehouses(args, options):
defaults={'name': 'CDW Impala', 'description': 'CDW Impala Warehouse', 'dialect': 'impala', 'interface': 'hiveserver2'})
add_computes_to_warehouse(impala_warehouse, impalas)

(trino_warehouse, created) = models.Namespace.objects.get_or_create(
external_id="CDW_TRINO_WAREHOUSE",
defaults={'name': 'CDW Trino', 'description': 'CDW Trino Warehouse', 'dialect': 'trino', 'interface': 'trino'})
add_computes_to_warehouse(trino_warehouse, trinos)

LOG.info("Synced computes")
LOG.debug("Current computes %s" % models.Compute.objects.all())

Expand Down Expand Up @@ -104,7 +111,7 @@ def get_computes_from_k8s():
namespace = n.metadata.name
LOG.info('Getting details for ns: %s' % namespace)
item = {
'name': n.metadata.labels.get('displayname'),
'name': n.metadata.labels.get('displayname', namespace),
'description': '%s (%s)' % (n.metadata.labels.get('displayname'), n.metadata.name),
'external_id': namespace,
# 'creation_timestamp': n.metadata.labels.get('creation_timestamp'),
Expand All @@ -118,8 +125,11 @@ def get_computes_from_k8s():
elif namespace.startswith('impala-'):
populate_impala(namespace, item)
computes.append(item)
elif namespace.startswith('trino-'):
update_trino_configs(namespace, item)
computes.append(item)
except Exception as e:
LOG.exception('Could not get details for ns: %s' % n)
LOG.exception('Could not get details for ns: %s' % (n.metadata.name if n.metadata is not None else n))

return computes

Expand Down Expand Up @@ -224,3 +234,43 @@ def update_impala_configs(namespace, impala, host):
'ldap_groups': ldap_groups.split(",") if ldap_groups else None,
'settings': json.dumps(settings)
})


def update_trino_configs(namespace, trino):
deployments = apps_v1.list_namespaced_deployment(namespace).items
stfs = apps_v1.list_namespaced_stateful_set(namespace).items
ingresses = networking_v1.list_namespaced_ingress(namespace).items
trino_worker_dep = next((d for d in deployments
if d.metadata.labels['app'] == 'trino' and d.metadata.labels['component'] == 'trino-worker'),
None)
trino_coordinator_stfs = next((s for s in stfs if s.metadata.labels['app'] == 'trino-coordinator'), None)
trino_coordinator_ingress = next((i for i in ingresses if i.metadata.name == 'trino-coordinator-ingress'), None)

trino['is_ready'] = bool(trino_worker_dep and trino_worker_dep.status.ready_replicas
and trino_coordinator_stfs and trino_coordinator_stfs.status.ready_replicas)

if not trino['is_ready']:
LOG.info("Trino %s not ready" % namespace)

coordinator_url = 'http://trino-coordinator.%s.svc.cluster.local:8080' % namespace
settings = []

trino_coordinator_configs = core_v1.read_namespaced_config_map('trino-coordinator-config', namespace)
core_site_data = confparse.ConfParse(trino_coordinator_configs.data['core-site.xml'])
ldap_bin_user = core_site_data.get('hadoop.security.group.mapping.ldap.bind.user')
if ldap_bin_user:
ldap_user_regex = '.*uid=([^,]+).*'
match = re.search(ldap_user_regex, ldap_bin_user)
ldap_user_id = match.group(1) if match and match.group(1) else None
settings.append({"name": "auth_username", "value": ldap_user_id})
settings.append({"name": "auth_password_script", "value": "/etc/hue/conf/altscript.sh hue.binduser.password"})
if trino_coordinator_ingress and trino_coordinator_ingress.spec.rules:
coordinator_url = 'https://%s:443' % trino_coordinator_ingress.spec.rules[0].host

settings.append({"name": "url", "value": coordinator_url})

trino.update({
'dialect': 'trino',
'interface': 'trino',
'settings': json.dumps(settings)
})
2 changes: 1 addition & 1 deletion desktop/core/src/desktop/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1909,7 +1909,7 @@ def _get_editor(self):
'optimizer': get_optimizer_mode(),
'page': '/editor/?type=%(type)s' % interpreter,
'is_sql': interpreter['is_sql'],
'is_batchable': interpreter['dialect'] in ['hive', 'impala'] or interpreter['interface'] in ['oozie', 'sqlalchemy'],
'is_batchable': interpreter['dialect'] in ['hive', 'impala', 'trino'] or interpreter['interface'] in ['oozie', 'sqlalchemy'],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this required to enable batch queries for Trino?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If hive and impala are batch-able then why not trino? it seems to work fine on the frontend. Is there a reason not to do it?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even I was curious if Trino supports batch-querying in Hue OOTB. Good that the frontend is looking good.

@agl29 - Let's validate and test this out when implementing V2?

'dialect': interpreter['dialect'],
'dialect_properties': interpreter.get('dialect_properties'),
})
Expand Down
2 changes: 1 addition & 1 deletion desktop/libs/notebook/src/notebook/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -752,7 +752,7 @@ def autocomplete(request, server=None, database=None, table=None, column=None, n
# Passed by check_document_access_permission but unused by APIs
notebook = json.loads(request.POST.get('notebook', '{}'))
cluster = json.loads(request.POST.get('cluster', '{}'))
if cluster and cluster.get('type') in ('hive-compute', 'impala-compute'):
if cluster and cluster.get('type') in ('hive-compute', 'impala-compute', 'trino-compute'):
snippet = cluster
else:
snippet = json.loads(request.POST.get('snippet', '{}'))
Expand Down
Loading