From a7e7017b4a9348936d869716baba4e696cd94ec9 Mon Sep 17 00:00:00 2001 From: Amit Srivastava Date: Wed, 18 Sep 2024 15:43:33 -0700 Subject: [PATCH] DWX-17085: added compute support for trino With this change, trino compute can be discovered, saved to db and used similar to hive and impala. --- apps/beeswax/src/beeswax/common.py | 28 ++++++---- apps/beeswax/src/beeswax/conf.py | 53 ++++++++++++------ desktop/core/src/desktop/conf.py | 6 +++ .../management/commands/sync_warehouses.py | 54 ++++++++++++++++++- desktop/core/src/desktop/models.py | 2 +- desktop/libs/notebook/src/notebook/api.py | 2 +- desktop/libs/notebook/src/notebook/conf.py | 2 +- .../notebook/src/notebook/connectors/base.py | 6 +-- .../notebook/src/notebook/connectors/trino.py | 2 +- 9 files changed, 120 insertions(+), 35 deletions(-) diff --git a/apps/beeswax/src/beeswax/common.py b/apps/beeswax/src/beeswax/common.py index e3401e86ff1..7a759b1a8f4 100644 --- a/apps/beeswax/src/beeswax/common.py +++ b/apps/beeswax/src/beeswax/common.py @@ -20,15 +20,15 @@ """ from __future__ import print_function -import numbers import re import time +import numbers from django import forms -from beeswax.models import Namespace, Compute +from beeswax.models import Compute, Namespace -HIVE_IDENTIFER_REGEX = re.compile("(^[a-zA-Z0-9]\w*\.)?[a-zA-Z0-9]\w*$") +HIVE_IDENTIFER_REGEX = re.compile(r"(^[a-zA-Z0-9]\w*\.)?[a-zA-Z0-9]\w*$") DL_FORMATS = ['csv', 'xls'] @@ -44,7 +44,7 @@ RELATION_OPS = ['=', '<>', '<', '<=', '>', '>='] + RELATION_OPS_UNARY -COMPUTE_TYPES = ['hive-compute', 'impala-compute'] +COMPUTE_TYPES = ['hive-compute', 'impala-compute', 'trino-compute'] TERMINATORS = [ # (hive representation, description, ascii value) @@ -56,12 +56,13 @@ (' ', "Space", 32), ] + def timing(fn): def decorator(*args, **kwargs): time1 = time.time() ret = fn(*args, **kwargs) time2 = time.time() - print('%s elapsed time: %0.3f ms' % (fn.__name__, (time2-time1)*1000.0)) + print('%s elapsed time: %0.3f ms' % (fn.__name__, (time2 - time1) * 1000.0)) return ret return decorator @@ -79,7 +80,8 @@ def apply_natural_sort(collection, key=None): Applies a natural sort (http://rosettacode.org/wiki/Natural_sorting) to a list or dictionary Dictionary types require a sort key to be specified """ - to_digit = lambda i: int(i) if i.isdigit() else i + def to_digit(i): + return int(i) if i.isdigit() else i def tokenize_and_convert(item, key=None): if key: @@ -94,7 +96,9 @@ def is_compute(cluster): return False connector = cluster.get('connector') compute = cluster.get('compute') - compute_check = lambda x: x and x.get('type') in COMPUTE_TYPES + + def compute_check(x): + return x and x.get('type') in COMPUTE_TYPES return compute_check(cluster) or compute_check(connector) or compute_check(compute) @@ -107,12 +111,16 @@ def is_compute(cluster): 3. Lookup namespace based on dialect from cluster or prpvided dialect and return the first compute filtered by user-access. Needs valid user ''' + + def find_compute(cluster=None, user=None, dialect=None, namespace_id=None): if cluster: # If we find a full/partial cluster object, we will attempt to load a compute connector = cluster.get('connector') compute = cluster.get('compute') - compute_check = lambda x: x and x.get('type') in COMPUTE_TYPES + + def compute_check(x): + return x and x.get('type') in COMPUTE_TYPES # Pick the most probable compute object selected_compute = (cluster if compute_check(cluster) @@ -135,7 +143,9 @@ def find_compute(cluster=None, user=None, dialect=None, namespace_id=None): dialect = selected_compute['dialect'] if selected_compute.get('dialect') else dialect if not dialect and cluster.get('type'): t = cluster['type'] - dialect = 'hive' if t.startswith('hive') else 'impala' if t.startswith('impala') else None + dialect = 'hive' if t.startswith('hive') else\ + 'impala' if t.startswith('impala') else\ + 'trino' if t.startswith('trino') else None # We will attempt to find a default compute based on other criteria ns = None diff --git a/apps/beeswax/src/beeswax/conf.py b/apps/beeswax/src/beeswax/conf.py index e1b05b21c4d..032ea79010a 100644 --- a/apps/beeswax/src/beeswax/conf.py +++ b/apps/beeswax/src/beeswax/conf.py @@ -16,21 +16,22 @@ # limitations under the License. from __future__ import division -from builtins import str -import logging + +import sys import math +import logging import os.path -import sys - -from desktop.conf import default_ssl_cacerts, default_ssl_validate, AUTH_PASSWORD as DEFAULT_AUTH_PASSWORD,\ - AUTH_USERNAME as DEFAULT_AUTH_USERNAME -from desktop.lib.conf import ConfigSection, Config, coerce_bool, coerce_csv, coerce_password_from_script +from builtins import str -if sys.version_info[0] > 2: - from django.utils.translation import gettext_lazy as _t, gettext as _ -else: - from django.utils.translation import ugettext_lazy as _t, ugettext as _ +from django.utils.translation import gettext as _, gettext_lazy as _t +from desktop.conf import ( + AUTH_PASSWORD as DEFAULT_AUTH_PASSWORD, + AUTH_USERNAME as DEFAULT_AUTH_USERNAME, + default_ssl_cacerts, + default_ssl_validate, +) +from desktop.lib.conf import Config, ConfigSection, coerce_bool, coerce_csv, coerce_password_from_script LOG = logging.getLogger() @@ -103,21 +104,25 @@ "the fully-qualified domain name (FQDN) is required"), default="localhost") + def get_hive_thrift_binary_port(): """Devise port from core-site Thrift / execution mode & Http port""" - from beeswax.hive_site import hiveserver2_thrift_binary_port, get_hive_execution_mode # Cyclic dependency + from beeswax.hive_site import get_hive_execution_mode, hiveserver2_thrift_binary_port # Cyclic dependency return hiveserver2_thrift_binary_port() or (10500 if (get_hive_execution_mode() or '').lower() == 'llap' else 10000) + HIVE_SERVER_PORT = Config( key="hive_server_port", help=_t("Configure the binary Thrift port for HiveServer2."), dynamic_default=get_hive_thrift_binary_port, type=int) + def get_hive_thrift_http_port(): """Devise port from core-site Thrift / execution mode & Http port""" - from beeswax.hive_site import hiveserver2_thrift_http_port, get_hive_execution_mode # Cyclic dependency - return hiveserver2_thrift_http_port() or (10501 if (get_hive_execution_mode() or '').lower() == 'llap' else 10001) + from beeswax.hive_site import get_hive_execution_mode, hiveserver2_thrift_http_port # Cyclic dependency + return hiveserver2_thrift_http_port() or (10501 if (get_hive_execution_mode() or '').lower() == 'llap' else 10001) + HIVE_HTTP_THRIFT_PORT = Config( key="hive_server_http_port", @@ -165,7 +170,7 @@ def get_hive_thrift_http_port(): type=int, help=_t('Timeout in seconds for zookeeper connection.')) -USE_GET_LOG_API = Config( # To remove in Hue 4 +USE_GET_LOG_API = Config( # To remove in Hue 4 key='use_get_log_api', default=False, type=coerce_bool, @@ -173,7 +178,7 @@ def get_hive_thrift_http_port(): 'If false, use the FetchResults() Thrift call from Hive 1.0 or more instead.') ) -BROWSE_PARTITIONED_TABLE_LIMIT = Config( # Deprecated, to remove in Hue 4 +BROWSE_PARTITIONED_TABLE_LIMIT = Config( # Deprecated, to remove in Hue 4 key='browse_partitioned_table_limit', default=1000, type=int, @@ -187,10 +192,12 @@ def get_hive_thrift_http_port(): type=int, help=_t('The maximum number of partitions that will be included in the SELECT * LIMIT sample query for partitioned tables.')) + def get_browse_partitioned_table_limit(): """Get the old default""" return BROWSE_PARTITIONED_TABLE_LIMIT.get() + LIST_PARTITIONS_LIMIT = Config( key='list_partitions_limit', dynamic_default=get_browse_partitioned_table_limit, @@ -206,10 +213,12 @@ def get_browse_partitioned_table_limit(): '(e.g. - 10K rows * 1K columns = 10M cells.) ' 'A value of -1 means there will be no limit.')) + def get_deprecated_download_cell_limit(): """Get the old default""" return math.floor(DOWNLOAD_CELL_LIMIT.get() / 100) if DOWNLOAD_CELL_LIMIT.get() > 0 else DOWNLOAD_CELL_LIMIT.get() + DOWNLOAD_ROW_LIMIT = Config( key='download_row_limit', dynamic_default=get_deprecated_download_cell_limit, @@ -297,15 +306,18 @@ def get_deprecated_download_cell_limit(): ) ) + def get_auth_username(): """Get from top level default from desktop""" return DEFAULT_AUTH_USERNAME.get() + AUTH_USERNAME = Config( key="auth_username", help=_t("Auth username of the hue user used for authentications."), dynamic_default=get_auth_username) + def get_auth_password(): """Get from script or backward compatibility""" password = AUTH_PASSWORD_SCRIPT.get() @@ -314,6 +326,7 @@ def get_auth_password(): return DEFAULT_AUTH_PASSWORD.get() + AUTH_PASSWORD = Config( key="auth_password", help=_t("LDAP/PAM/.. password of the hue user used for authentications."), @@ -327,13 +340,15 @@ def get_auth_password(): type=coerce_password_from_script, default=None) + def get_use_sasl_default(): """Get from hive_site or backward compatibility""" from beeswax.hive_site import get_hiveserver2_authentication, get_use_sasl # Cyclic dependency use_sasl = get_use_sasl() if use_sasl is not None: return use_sasl.upper() == 'TRUE' - return get_hiveserver2_authentication() in ('KERBEROS', 'NONE', 'LDAP', 'PAM') # list for backward compatibility + return get_hiveserver2_authentication() in ('KERBEROS', 'NONE', 'LDAP', 'PAM') # list for backward compatibility + USE_SASL = Config( key="use_sasl", @@ -342,10 +357,12 @@ def get_use_sasl_default(): type=coerce_bool, dynamic_default=get_use_sasl_default) + def has_multiple_sessions(): """When true will create multiple sessions for user queries""" return MAX_NUMBER_OF_SESSIONS.get() != 1 + CLOSE_SESSIONS = Config( key="close_sessions", help=_t( @@ -356,9 +373,11 @@ def has_multiple_sessions(): dynamic_default=has_multiple_sessions ) + def has_session_pool(): return has_multiple_sessions() and not CLOSE_SESSIONS.get() + MAX_CATALOG_SQL_ENTRIES = Config( key="max_catalog_sql_entries", help=_t( diff --git a/desktop/core/src/desktop/conf.py b/desktop/core/src/desktop/conf.py index f788adfbdeb..d8bf1c6b967 100644 --- a/desktop/core/src/desktop/conf.py +++ b/desktop/core/src/desktop/conf.py @@ -2258,6 +2258,12 @@ def has_connectors(): return ENABLE_CONNECTORS.get() +def is_cdw_compute_enabled(): + '''When the computes feature is turned on''' + clusters = CLUSTERS.get() + return bool(clusters and [c for c in clusters.values() if c.TYPE.get() == 'cdw']) + + CLUSTERS = UnspecifiedConfigSection( "clusters", help="One entry for each additional remote cluster Hue can interact with.", diff --git a/desktop/core/src/desktop/management/commands/sync_warehouses.py b/desktop/core/src/desktop/management/commands/sync_warehouses.py index 432b9f018ce..f1159fca3c9 100644 --- a/desktop/core/src/desktop/management/commands/sync_warehouses.py +++ b/desktop/core/src/desktop/management/commands/sync_warehouses.py @@ -42,6 +42,7 @@ core_v1 = client.CoreV1Api() apps_v1 = client.AppsV1Api() +networking_v1 = client.NetworkingV1Api() SERVER_HELP = r""" Sync up the desktop_connectors with the available hive and impala warehouses @@ -64,6 +65,7 @@ def sync_warehouses(args, options): hives = [c for c in computes if c['dialect'] == 'hive'] impalas = [c for c in computes if c['dialect'] == 'impala'] + trinos = [c for c in computes if c['dialect'] == 'trino'] (hive_warehouse, created) = models.Namespace.objects.get_or_create( external_id="CDW_HIVE_WAREHOUSE", @@ -75,6 +77,11 @@ def sync_warehouses(args, options): defaults={'name': 'CDW Impala', 'description': 'CDW Impala Warehouse', 'dialect': 'impala', 'interface': 'hiveserver2'}) add_computes_to_warehouse(impala_warehouse, impalas) + (trino_warehouse, created) = models.Namespace.objects.get_or_create( + external_id="CDW_TRINO_WAREHOUSE", + defaults={'name': 'CDW Trino', 'description': 'CDW Trino Warehouse', 'dialect': 'trino', 'interface': 'trino'}) + add_computes_to_warehouse(trino_warehouse, trinos) + LOG.info("Synced computes") LOG.debug("Current computes %s" % models.Compute.objects.all()) @@ -104,7 +111,7 @@ def get_computes_from_k8s(): namespace = n.metadata.name LOG.info('Getting details for ns: %s' % namespace) item = { - 'name': n.metadata.labels.get('displayname'), + 'name': n.metadata.labels.get('displayname', namespace), 'description': '%s (%s)' % (n.metadata.labels.get('displayname'), n.metadata.name), 'external_id': namespace, # 'creation_timestamp': n.metadata.labels.get('creation_timestamp'), @@ -118,8 +125,11 @@ def get_computes_from_k8s(): elif namespace.startswith('impala-'): populate_impala(namespace, item) computes.append(item) + elif namespace.startswith('trino-'): + update_trino_configs(namespace, item) + computes.append(item) except Exception as e: - LOG.exception('Could not get details for ns: %s' % n) + LOG.exception('Could not get details for ns: %s' % (n.metadata.name if n.metadata is not None else n)) return computes @@ -224,3 +234,43 @@ def update_impala_configs(namespace, impala, host): 'ldap_groups': ldap_groups.split(",") if ldap_groups else None, 'settings': json.dumps(settings) }) + + +def update_trino_configs(namespace, trino): + deployments = apps_v1.list_namespaced_deployment(namespace).items + stfs = apps_v1.list_namespaced_stateful_set(namespace).items + ingresses = networking_v1.list_namespaced_ingress(namespace).items + trino_worker_dep = next((d for d in deployments + if d.metadata.labels['app'] == 'trino' and d.metadata.labels['component'] == 'trino-worker'), + None) + trino_coordinator_stfs = next((s for s in stfs if s.metadata.labels['app'] == 'trino-coordinator'), None) + trino_coordinator_ingress = next((i for i in ingresses if i.metadata.name == 'trino-coordinator-ingress'), None) + + trino['is_ready'] = bool(trino_worker_dep and trino_worker_dep.status.ready_replicas + and trino_coordinator_stfs and trino_coordinator_stfs.status.ready_replicas) + + if not trino['is_ready']: + LOG.info("Trino %s not ready" % namespace) + + coordinator_url = 'http://trino-coordinator.%s.svc.cluster.local:8080' % namespace + settings = [] + + trino_coordinator_configs = core_v1.read_namespaced_config_map('trino-coordinator-config', namespace) + core_site_data = confparse.ConfParse(trino_coordinator_configs.data['core-site.xml']) + ldap_bin_user = core_site_data.get('hadoop.security.group.mapping.ldap.bind.user') + if ldap_bin_user: + ldap_user_regex = '.*uid=([^,]+).*' + match = re.search(ldap_user_regex, ldap_bin_user) + ldap_user_id = match.group(1) if match and match.group(1) else None + settings.append({"name": "auth_username", "value": ldap_user_id}) + settings.append({"name": "auth_password_script", "value": "/etc/hue/conf/altscript.sh hue.binduser.password"}) + if trino_coordinator_ingress and trino_coordinator_ingress.spec.rules: + coordinator_url = 'https://%s:443' % trino_coordinator_ingress.spec.rules[0].host + + settings.append({"name": "url", "value": coordinator_url}) + + trino.update({ + 'dialect': 'trino', + 'interface': 'trino', + 'settings': json.dumps(settings) + }) diff --git a/desktop/core/src/desktop/models.py b/desktop/core/src/desktop/models.py index 3c7285c5c23..96142cad284 100644 --- a/desktop/core/src/desktop/models.py +++ b/desktop/core/src/desktop/models.py @@ -1909,7 +1909,7 @@ def _get_editor(self): 'optimizer': get_optimizer_mode(), 'page': '/editor/?type=%(type)s' % interpreter, 'is_sql': interpreter['is_sql'], - 'is_batchable': interpreter['dialect'] in ['hive', 'impala'] or interpreter['interface'] in ['oozie', 'sqlalchemy'], + 'is_batchable': interpreter['dialect'] in ['hive', 'impala', 'trino'] or interpreter['interface'] in ['oozie', 'sqlalchemy'], 'dialect': interpreter['dialect'], 'dialect_properties': interpreter.get('dialect_properties'), }) diff --git a/desktop/libs/notebook/src/notebook/api.py b/desktop/libs/notebook/src/notebook/api.py index 833bb9afc39..2c4739c8a5e 100644 --- a/desktop/libs/notebook/src/notebook/api.py +++ b/desktop/libs/notebook/src/notebook/api.py @@ -752,7 +752,7 @@ def autocomplete(request, server=None, database=None, table=None, column=None, n # Passed by check_document_access_permission but unused by APIs notebook = json.loads(request.POST.get('notebook', '{}')) cluster = json.loads(request.POST.get('cluster', '{}')) - if cluster and cluster.get('type') in ('hive-compute', 'impala-compute'): + if cluster and cluster.get('type') in ('hive-compute', 'impala-compute', 'trino-compute'): snippet = cluster else: snippet = json.loads(request.POST.get('snippet', '{}')) diff --git a/desktop/libs/notebook/src/notebook/conf.py b/desktop/libs/notebook/src/notebook/conf.py index e214c910056..38b64516903 100644 --- a/desktop/libs/notebook/src/notebook/conf.py +++ b/desktop/libs/notebook/src/notebook/conf.py @@ -102,7 +102,7 @@ def get_ordered_interpreters(user=None): for interpreter in INTERPRETERS_CACHE: if check_has_missing_permission(user, interpreter, user_apps=user_apps): pass # Not allowed - elif has_computes and interpreter in ('hive', 'impala') and not computes_for_dialect(interpreter, user): + elif has_computes and interpreter in ('hive', 'impala', 'trino') and not computes_for_dialect(interpreter, user): pass # No available computes for the dialect so skip else: user_interpreters.append(interpreter) diff --git a/desktop/libs/notebook/src/notebook/connectors/base.py b/desktop/libs/notebook/src/notebook/connectors/base.py index 2d8c0e1f2fc..03491cf169e 100644 --- a/desktop/libs/notebook/src/notebook/connectors/base.py +++ b/desktop/libs/notebook/src/notebook/connectors/base.py @@ -27,7 +27,7 @@ from beeswax.common import find_compute, is_compute from desktop.auth.backend import is_admin -from desktop.conf import TASK_SERVER, has_connectors +from desktop.conf import TASK_SERVER, has_connectors, is_cdw_compute_enabled from desktop.lib import export_csvxls from desktop.lib.exceptions_renderable import PopupException from desktop.lib.i18n import smart_unicode @@ -439,10 +439,10 @@ def get_api(request, snippet): if has_connectors() and snippet.get('type') == 'hello' and is_admin(request.user): LOG.debug('Using the interpreter from snippet') interpreter = snippet.get('interpreter') - elif is_compute(snippet): + elif is_cdw_compute_enabled(): LOG.debug("Finding the compute from db using snippet: %s" % snippet) interpreter = find_compute(cluster=snippet, user=request.user) - else: + if interpreter is None: LOG.debug("Picking up the connectors from the configs using connector_name: %s" % connector_name) interpreter = get_interpreter(connector_type=connector_name, user=request.user) diff --git a/desktop/libs/notebook/src/notebook/connectors/trino.py b/desktop/libs/notebook/src/notebook/connectors/trino.py index 4b3c2f5a49c..da5bc77835a 100644 --- a/desktop/libs/notebook/src/notebook/connectors/trino.py +++ b/desktop/libs/notebook/src/notebook/connectors/trino.py @@ -57,7 +57,7 @@ class TrinoApi(Api): def __init__(self, user, interpreter=None): Api.__init__(self, user, interpreter=interpreter) self.options = interpreter['options'] - self.server_host, self.server_port, self.http_scheme = self.parse_api_url(self.options['url']) + self.server_host, self.server_port, self.http_scheme = self.parse_api_url(self.options.get('url')) self.auth = None auth_username = self.options.get('auth_username', DEFAULT_AUTH_USERNAME.get())