Skip to content

Commit

Permalink
DWX-17085: added compute support for trino
Browse files Browse the repository at this point in the history
With this change, trino compute can be discovered, saved to db and used
similar to hive and impala.
  • Loading branch information
amitsrivastava committed Sep 24, 2024
1 parent d7a54dd commit a7e7017
Show file tree
Hide file tree
Showing 9 changed files with 120 additions and 35 deletions.
28 changes: 19 additions & 9 deletions apps/beeswax/src/beeswax/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@
"""
from __future__ import print_function

import numbers
import re
import time
import numbers

from django import forms

from beeswax.models import Namespace, Compute
from beeswax.models import Compute, Namespace

HIVE_IDENTIFER_REGEX = re.compile("(^[a-zA-Z0-9]\w*\.)?[a-zA-Z0-9]\w*$")
HIVE_IDENTIFER_REGEX = re.compile(r"(^[a-zA-Z0-9]\w*\.)?[a-zA-Z0-9]\w*$")

DL_FORMATS = ['csv', 'xls']

Expand All @@ -44,7 +44,7 @@

RELATION_OPS = ['=', '<>', '<', '<=', '>', '>='] + RELATION_OPS_UNARY

COMPUTE_TYPES = ['hive-compute', 'impala-compute']
COMPUTE_TYPES = ['hive-compute', 'impala-compute', 'trino-compute']

TERMINATORS = [
# (hive representation, description, ascii value)
Expand All @@ -56,12 +56,13 @@
(' ', "Space", 32),
]


def timing(fn):
def decorator(*args, **kwargs):
time1 = time.time()
ret = fn(*args, **kwargs)
time2 = time.time()
print('%s elapsed time: %0.3f ms' % (fn.__name__, (time2-time1)*1000.0))
print('%s elapsed time: %0.3f ms' % (fn.__name__, (time2 - time1) * 1000.0))
return ret
return decorator

Expand All @@ -79,7 +80,8 @@ def apply_natural_sort(collection, key=None):
Applies a natural sort (http://rosettacode.org/wiki/Natural_sorting) to a list or dictionary
Dictionary types require a sort key to be specified
"""
to_digit = lambda i: int(i) if i.isdigit() else i
def to_digit(i):
return int(i) if i.isdigit() else i

def tokenize_and_convert(item, key=None):
if key:
Expand All @@ -94,7 +96,9 @@ def is_compute(cluster):
return False
connector = cluster.get('connector')
compute = cluster.get('compute')
compute_check = lambda x: x and x.get('type') in COMPUTE_TYPES

def compute_check(x):
return x and x.get('type') in COMPUTE_TYPES
return compute_check(cluster) or compute_check(connector) or compute_check(compute)


Expand All @@ -107,12 +111,16 @@ def is_compute(cluster):
3. Lookup namespace based on dialect from cluster or prpvided dialect
and return the first compute filtered by user-access. Needs valid user
'''


def find_compute(cluster=None, user=None, dialect=None, namespace_id=None):
if cluster:
# If we find a full/partial cluster object, we will attempt to load a compute
connector = cluster.get('connector')
compute = cluster.get('compute')
compute_check = lambda x: x and x.get('type') in COMPUTE_TYPES

def compute_check(x):
return x and x.get('type') in COMPUTE_TYPES

# Pick the most probable compute object
selected_compute = (cluster if compute_check(cluster)
Expand All @@ -135,7 +143,9 @@ def find_compute(cluster=None, user=None, dialect=None, namespace_id=None):
dialect = selected_compute['dialect'] if selected_compute.get('dialect') else dialect
if not dialect and cluster.get('type'):
t = cluster['type']
dialect = 'hive' if t.startswith('hive') else 'impala' if t.startswith('impala') else None
dialect = 'hive' if t.startswith('hive') else\
'impala' if t.startswith('impala') else\
'trino' if t.startswith('trino') else None

# We will attempt to find a default compute based on other criteria
ns = None
Expand Down
53 changes: 36 additions & 17 deletions apps/beeswax/src/beeswax/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,22 @@
# limitations under the License.

from __future__ import division
from builtins import str
import logging

import sys
import math
import logging
import os.path
import sys

from desktop.conf import default_ssl_cacerts, default_ssl_validate, AUTH_PASSWORD as DEFAULT_AUTH_PASSWORD,\
AUTH_USERNAME as DEFAULT_AUTH_USERNAME
from desktop.lib.conf import ConfigSection, Config, coerce_bool, coerce_csv, coerce_password_from_script
from builtins import str

if sys.version_info[0] > 2:
from django.utils.translation import gettext_lazy as _t, gettext as _
else:
from django.utils.translation import ugettext_lazy as _t, ugettext as _
from django.utils.translation import gettext as _, gettext_lazy as _t

from desktop.conf import (
AUTH_PASSWORD as DEFAULT_AUTH_PASSWORD,
AUTH_USERNAME as DEFAULT_AUTH_USERNAME,
default_ssl_cacerts,
default_ssl_validate,
)
from desktop.lib.conf import Config, ConfigSection, coerce_bool, coerce_csv, coerce_password_from_script

LOG = logging.getLogger()

Expand Down Expand Up @@ -103,21 +104,25 @@
"the fully-qualified domain name (FQDN) is required"),
default="localhost")


def get_hive_thrift_binary_port():
"""Devise port from core-site Thrift / execution mode & Http port"""
from beeswax.hive_site import hiveserver2_thrift_binary_port, get_hive_execution_mode # Cyclic dependency
from beeswax.hive_site import get_hive_execution_mode, hiveserver2_thrift_binary_port # Cyclic dependency
return hiveserver2_thrift_binary_port() or (10500 if (get_hive_execution_mode() or '').lower() == 'llap' else 10000)


HIVE_SERVER_PORT = Config(
key="hive_server_port",
help=_t("Configure the binary Thrift port for HiveServer2."),
dynamic_default=get_hive_thrift_binary_port,
type=int)


def get_hive_thrift_http_port():
"""Devise port from core-site Thrift / execution mode & Http port"""
from beeswax.hive_site import hiveserver2_thrift_http_port, get_hive_execution_mode # Cyclic dependency
return hiveserver2_thrift_http_port() or (10501 if (get_hive_execution_mode() or '').lower() == 'llap' else 10001)
from beeswax.hive_site import get_hive_execution_mode, hiveserver2_thrift_http_port # Cyclic dependency
return hiveserver2_thrift_http_port() or (10501 if (get_hive_execution_mode() or '').lower() == 'llap' else 10001)


HIVE_HTTP_THRIFT_PORT = Config(
key="hive_server_http_port",
Expand Down Expand Up @@ -165,15 +170,15 @@ def get_hive_thrift_http_port():
type=int,
help=_t('Timeout in seconds for zookeeper connection.'))

USE_GET_LOG_API = Config( # To remove in Hue 4
USE_GET_LOG_API = Config( # To remove in Hue 4
key='use_get_log_api',
default=False,
type=coerce_bool,
help=_t('Choose whether to use the old GetLog() Thrift call from before Hive 0.14 to retrieve the logs.'
'If false, use the FetchResults() Thrift call from Hive 1.0 or more instead.')
)

BROWSE_PARTITIONED_TABLE_LIMIT = Config( # Deprecated, to remove in Hue 4
BROWSE_PARTITIONED_TABLE_LIMIT = Config( # Deprecated, to remove in Hue 4
key='browse_partitioned_table_limit',
default=1000,
type=int,
Expand All @@ -187,10 +192,12 @@ def get_hive_thrift_http_port():
type=int,
help=_t('The maximum number of partitions that will be included in the SELECT * LIMIT sample query for partitioned tables.'))


def get_browse_partitioned_table_limit():
"""Get the old default"""
return BROWSE_PARTITIONED_TABLE_LIMIT.get()


LIST_PARTITIONS_LIMIT = Config(
key='list_partitions_limit',
dynamic_default=get_browse_partitioned_table_limit,
Expand All @@ -206,10 +213,12 @@ def get_browse_partitioned_table_limit():
'(e.g. - 10K rows * 1K columns = 10M cells.) '
'A value of -1 means there will be no limit.'))


def get_deprecated_download_cell_limit():
"""Get the old default"""
return math.floor(DOWNLOAD_CELL_LIMIT.get() / 100) if DOWNLOAD_CELL_LIMIT.get() > 0 else DOWNLOAD_CELL_LIMIT.get()


DOWNLOAD_ROW_LIMIT = Config(
key='download_row_limit',
dynamic_default=get_deprecated_download_cell_limit,
Expand Down Expand Up @@ -297,15 +306,18 @@ def get_deprecated_download_cell_limit():
)
)


def get_auth_username():
"""Get from top level default from desktop"""
return DEFAULT_AUTH_USERNAME.get()


AUTH_USERNAME = Config(
key="auth_username",
help=_t("Auth username of the hue user used for authentications."),
dynamic_default=get_auth_username)


def get_auth_password():
"""Get from script or backward compatibility"""
password = AUTH_PASSWORD_SCRIPT.get()
Expand All @@ -314,6 +326,7 @@ def get_auth_password():

return DEFAULT_AUTH_PASSWORD.get()


AUTH_PASSWORD = Config(
key="auth_password",
help=_t("LDAP/PAM/.. password of the hue user used for authentications."),
Expand All @@ -327,13 +340,15 @@ def get_auth_password():
type=coerce_password_from_script,
default=None)


def get_use_sasl_default():
"""Get from hive_site or backward compatibility"""
from beeswax.hive_site import get_hiveserver2_authentication, get_use_sasl # Cyclic dependency
use_sasl = get_use_sasl()
if use_sasl is not None:
return use_sasl.upper() == 'TRUE'
return get_hiveserver2_authentication() in ('KERBEROS', 'NONE', 'LDAP', 'PAM') # list for backward compatibility
return get_hiveserver2_authentication() in ('KERBEROS', 'NONE', 'LDAP', 'PAM') # list for backward compatibility


USE_SASL = Config(
key="use_sasl",
Expand All @@ -342,10 +357,12 @@ def get_use_sasl_default():
type=coerce_bool,
dynamic_default=get_use_sasl_default)


def has_multiple_sessions():
"""When true will create multiple sessions for user queries"""
return MAX_NUMBER_OF_SESSIONS.get() != 1


CLOSE_SESSIONS = Config(
key="close_sessions",
help=_t(
Expand All @@ -356,9 +373,11 @@ def has_multiple_sessions():
dynamic_default=has_multiple_sessions
)


def has_session_pool():
return has_multiple_sessions() and not CLOSE_SESSIONS.get()


MAX_CATALOG_SQL_ENTRIES = Config(
key="max_catalog_sql_entries",
help=_t(
Expand Down
6 changes: 6 additions & 0 deletions desktop/core/src/desktop/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -2258,6 +2258,12 @@ def has_connectors():
return ENABLE_CONNECTORS.get()


def is_cdw_compute_enabled():
'''When the computes feature is turned on'''
clusters = CLUSTERS.get()
return bool(clusters and [c for c in clusters.values() if c.TYPE.get() == 'cdw'])


CLUSTERS = UnspecifiedConfigSection(
"clusters",
help="One entry for each additional remote cluster Hue can interact with.",
Expand Down
54 changes: 52 additions & 2 deletions desktop/core/src/desktop/management/commands/sync_warehouses.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

core_v1 = client.CoreV1Api()
apps_v1 = client.AppsV1Api()
networking_v1 = client.NetworkingV1Api()

SERVER_HELP = r"""
Sync up the desktop_connectors with the available hive and impala warehouses
Expand All @@ -64,6 +65,7 @@ def sync_warehouses(args, options):

hives = [c for c in computes if c['dialect'] == 'hive']
impalas = [c for c in computes if c['dialect'] == 'impala']
trinos = [c for c in computes if c['dialect'] == 'trino']

(hive_warehouse, created) = models.Namespace.objects.get_or_create(
external_id="CDW_HIVE_WAREHOUSE",
Expand All @@ -75,6 +77,11 @@ def sync_warehouses(args, options):
defaults={'name': 'CDW Impala', 'description': 'CDW Impala Warehouse', 'dialect': 'impala', 'interface': 'hiveserver2'})
add_computes_to_warehouse(impala_warehouse, impalas)

(trino_warehouse, created) = models.Namespace.objects.get_or_create(
external_id="CDW_TRINO_WAREHOUSE",
defaults={'name': 'CDW Trino', 'description': 'CDW Trino Warehouse', 'dialect': 'trino', 'interface': 'trino'})
add_computes_to_warehouse(trino_warehouse, trinos)

LOG.info("Synced computes")
LOG.debug("Current computes %s" % models.Compute.objects.all())

Expand Down Expand Up @@ -104,7 +111,7 @@ def get_computes_from_k8s():
namespace = n.metadata.name
LOG.info('Getting details for ns: %s' % namespace)
item = {
'name': n.metadata.labels.get('displayname'),
'name': n.metadata.labels.get('displayname', namespace),
'description': '%s (%s)' % (n.metadata.labels.get('displayname'), n.metadata.name),
'external_id': namespace,
# 'creation_timestamp': n.metadata.labels.get('creation_timestamp'),
Expand All @@ -118,8 +125,11 @@ def get_computes_from_k8s():
elif namespace.startswith('impala-'):
populate_impala(namespace, item)
computes.append(item)
elif namespace.startswith('trino-'):
update_trino_configs(namespace, item)
computes.append(item)
except Exception as e:
LOG.exception('Could not get details for ns: %s' % n)
LOG.exception('Could not get details for ns: %s' % (n.metadata.name if n.metadata is not None else n))

return computes

Expand Down Expand Up @@ -224,3 +234,43 @@ def update_impala_configs(namespace, impala, host):
'ldap_groups': ldap_groups.split(",") if ldap_groups else None,
'settings': json.dumps(settings)
})


def update_trino_configs(namespace, trino):
deployments = apps_v1.list_namespaced_deployment(namespace).items
stfs = apps_v1.list_namespaced_stateful_set(namespace).items
ingresses = networking_v1.list_namespaced_ingress(namespace).items
trino_worker_dep = next((d for d in deployments
if d.metadata.labels['app'] == 'trino' and d.metadata.labels['component'] == 'trino-worker'),
None)
trino_coordinator_stfs = next((s for s in stfs if s.metadata.labels['app'] == 'trino-coordinator'), None)
trino_coordinator_ingress = next((i for i in ingresses if i.metadata.name == 'trino-coordinator-ingress'), None)

trino['is_ready'] = bool(trino_worker_dep and trino_worker_dep.status.ready_replicas
and trino_coordinator_stfs and trino_coordinator_stfs.status.ready_replicas)

if not trino['is_ready']:
LOG.info("Trino %s not ready" % namespace)

coordinator_url = 'http://trino-coordinator.%s.svc.cluster.local:8080' % namespace
settings = []

trino_coordinator_configs = core_v1.read_namespaced_config_map('trino-coordinator-config', namespace)
core_site_data = confparse.ConfParse(trino_coordinator_configs.data['core-site.xml'])
ldap_bin_user = core_site_data.get('hadoop.security.group.mapping.ldap.bind.user')
if ldap_bin_user:
ldap_user_regex = '.*uid=([^,]+).*'
match = re.search(ldap_user_regex, ldap_bin_user)
ldap_user_id = match.group(1) if match and match.group(1) else None
settings.append({"name": "auth_username", "value": ldap_user_id})
settings.append({"name": "auth_password_script", "value": "/etc/hue/conf/altscript.sh hue.binduser.password"})
if trino_coordinator_ingress and trino_coordinator_ingress.spec.rules:
coordinator_url = 'https://%s:443' % trino_coordinator_ingress.spec.rules[0].host

settings.append({"name": "url", "value": coordinator_url})

trino.update({
'dialect': 'trino',
'interface': 'trino',
'settings': json.dumps(settings)
})
2 changes: 1 addition & 1 deletion desktop/core/src/desktop/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1909,7 +1909,7 @@ def _get_editor(self):
'optimizer': get_optimizer_mode(),
'page': '/editor/?type=%(type)s' % interpreter,
'is_sql': interpreter['is_sql'],
'is_batchable': interpreter['dialect'] in ['hive', 'impala'] or interpreter['interface'] in ['oozie', 'sqlalchemy'],
'is_batchable': interpreter['dialect'] in ['hive', 'impala', 'trino'] or interpreter['interface'] in ['oozie', 'sqlalchemy'],
'dialect': interpreter['dialect'],
'dialect_properties': interpreter.get('dialect_properties'),
})
Expand Down
2 changes: 1 addition & 1 deletion desktop/libs/notebook/src/notebook/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -752,7 +752,7 @@ def autocomplete(request, server=None, database=None, table=None, column=None, n
# Passed by check_document_access_permission but unused by APIs
notebook = json.loads(request.POST.get('notebook', '{}'))
cluster = json.loads(request.POST.get('cluster', '{}'))
if cluster and cluster.get('type') in ('hive-compute', 'impala-compute'):
if cluster and cluster.get('type') in ('hive-compute', 'impala-compute', 'trino-compute'):
snippet = cluster
else:
snippet = json.loads(request.POST.get('snippet', '{}'))
Expand Down
Loading

0 comments on commit a7e7017

Please sign in to comment.