Skip to content

Commit

Permalink
DWX-17085: added compute support for trino
Browse files Browse the repository at this point in the history
With this change, trino compute can be discovered, saved to db and used
similar to hive and impala.
  • Loading branch information
amitsrivastava committed Sep 5, 2024
1 parent 5ec331d commit b6432c8
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 16 deletions.
28 changes: 19 additions & 9 deletions apps/beeswax/src/beeswax/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@
"""
from __future__ import print_function

import numbers
import re
import time
import numbers

from django import forms

from beeswax.models import Namespace, Compute
from beeswax.models import Compute, Namespace

HIVE_IDENTIFER_REGEX = re.compile("(^[a-zA-Z0-9]\w*\.)?[a-zA-Z0-9]\w*$")
HIVE_IDENTIFER_REGEX = re.compile(r"(^[a-zA-Z0-9]\w*\.)?[a-zA-Z0-9]\w*$")

DL_FORMATS = ['csv', 'xls']

Expand All @@ -44,7 +44,7 @@

RELATION_OPS = ['=', '<>', '<', '<=', '>', '>='] + RELATION_OPS_UNARY

COMPUTE_TYPES = ['hive-compute', 'impala-compute']
COMPUTE_TYPES = ['hive-compute', 'impala-compute', 'trino-compute', 'trino']

TERMINATORS = [
# (hive representation, description, ascii value)
Expand All @@ -56,12 +56,13 @@
(' ', "Space", 32),
]


def timing(fn):
def decorator(*args, **kwargs):
time1 = time.time()
ret = fn(*args, **kwargs)
time2 = time.time()
print('%s elapsed time: %0.3f ms' % (fn.__name__, (time2-time1)*1000.0))
print('%s elapsed time: %0.3f ms' % (fn.__name__, (time2 - time1) * 1000.0))
return ret
return decorator

Expand All @@ -79,7 +80,8 @@ def apply_natural_sort(collection, key=None):
Applies a natural sort (http://rosettacode.org/wiki/Natural_sorting) to a list or dictionary
Dictionary types require a sort key to be specified
"""
to_digit = lambda i: int(i) if i.isdigit() else i
def to_digit(i):
return int(i) if i.isdigit() else i

def tokenize_and_convert(item, key=None):
if key:
Expand All @@ -94,7 +96,9 @@ def is_compute(cluster):
return False
connector = cluster.get('connector')
compute = cluster.get('compute')
compute_check = lambda x: x and x.get('type') in COMPUTE_TYPES

def compute_check(x):
return x and x.get('type') in COMPUTE_TYPES
return compute_check(cluster) or compute_check(connector) or compute_check(compute)


Expand All @@ -107,12 +111,16 @@ def is_compute(cluster):
3. Lookup namespace based on dialect from cluster or prpvided dialect
and return the first compute filtered by user-access. Needs valid user
'''


def find_compute(cluster=None, user=None, dialect=None, namespace_id=None):
if cluster:
# If we find a full/partial cluster object, we will attempt to load a compute
connector = cluster.get('connector')
compute = cluster.get('compute')
compute_check = lambda x: x and x.get('type') in COMPUTE_TYPES

def compute_check(x):
return x and x.get('type') in COMPUTE_TYPES

# Pick the most probable compute object
selected_compute = (cluster if compute_check(cluster)
Expand All @@ -135,7 +143,9 @@ def find_compute(cluster=None, user=None, dialect=None, namespace_id=None):
dialect = selected_compute['dialect'] if selected_compute.get('dialect') else dialect
if not dialect and cluster.get('type'):
t = cluster['type']
dialect = 'hive' if t.startswith('hive') else 'impala' if t.startswith('impala') else None
dialect = 'hive' if t.startswith('hive') else\
'impala' if t.startswith('impala') else\
'trino' if t.startswith('trino') else None

# We will attempt to find a default compute based on other criteria
ns = None
Expand Down
6 changes: 6 additions & 0 deletions desktop/core/src/desktop/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -2258,6 +2258,12 @@ def has_connectors():
return ENABLE_CONNECTORS.get()


def is_cdw_compute_enabled():
'''When the computes feature is turned on'''
clusters = CLUSTERS.get()
return bool(clusters and [c for c in clusters.values() if c.TYPE.get() == 'cdw'])


CLUSTERS = UnspecifiedConfigSection(
"clusters",
help="One entry for each additional remote cluster Hue can interact with.",
Expand Down
55 changes: 53 additions & 2 deletions desktop/core/src/desktop/management/commands/sync_warehouses.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

core_v1 = client.CoreV1Api()
apps_v1 = client.AppsV1Api()
networking_v1 = client.NetworkingV1Api()

SERVER_HELP = r"""
Sync up the desktop_connectors with the available hive and impala warehouses
Expand All @@ -64,6 +65,7 @@ def sync_warehouses(args, options):

hives = [c for c in computes if c['dialect'] == 'hive']
impalas = [c for c in computes if c['dialect'] == 'impala']
trinos = [c for c in computes if c['dialect'] == 'trino']

(hive_warehouse, created) = models.Namespace.objects.get_or_create(
external_id="CDW_HIVE_WAREHOUSE",
Expand All @@ -75,6 +77,11 @@ def sync_warehouses(args, options):
defaults={'name': 'CDW Impala', 'description': 'CDW Impala Warehouse', 'dialect': 'impala', 'interface': 'hiveserver2'})
add_computes_to_warehouse(impala_warehouse, impalas)

(trino_warehouse, created) = models.Namespace.objects.get_or_create(
external_id="CDW_TRINO_WAREHOUSE",
defaults={'name': 'CDW Trino', 'description': 'CDW Trino Warehouse', 'dialect': 'trino', 'interface': 'trino'})
add_computes_to_warehouse(trino_warehouse, trinos)

LOG.info("Synced computes")
LOG.debug("Current computes %s" % models.Compute.objects.all())

Expand Down Expand Up @@ -104,7 +111,7 @@ def get_computes_from_k8s():
namespace = n.metadata.name
LOG.info('Getting details for ns: %s' % namespace)
item = {
'name': n.metadata.labels.get('displayname'),
'name': n.metadata.labels.get('displayname', namespace),
'description': '%s (%s)' % (n.metadata.labels.get('displayname'), n.metadata.name),
'external_id': namespace,
# 'creation_timestamp': n.metadata.labels.get('creation_timestamp'),
Expand All @@ -118,8 +125,11 @@ def get_computes_from_k8s():
elif namespace.startswith('impala-'):
populate_impala(namespace, item)
computes.append(item)
elif namespace.startswith('trino-'):
update_trino_configs(namespace, item)
computes.append(item)
except Exception as e:
LOG.exception('Could not get details for ns: %s' % n)
LOG.exception('Could not get details for ns: %s' % (n.metadata.name if n.metadata is not None else n))

return computes

Expand Down Expand Up @@ -224,3 +234,44 @@ def update_impala_configs(namespace, impala, host):
'ldap_groups': ldap_groups.split(",") if ldap_groups else None,
'settings': json.dumps(settings)
})


def update_trino_configs(namespace, trino):
deployments = apps_v1.list_namespaced_deployment(namespace).items
stfs = apps_v1.list_namespaced_stateful_set(namespace).items
ingresses = networking_v1.list_namespaced_ingress(namespace).items
trino_worker_dep = next((d for d in deployments
if d.metadata.labels['app'] == 'trino' and d.metadata.labels['component'] == 'trino-worker'),
None)
trino_coordinator_stfs = next((s for s in stfs if s.metadata.labels['app'] == 'trino-coordinator'), None)
trino_coordinator_ingress = next((i for i in ingresses if i.metadata.name == 'trino-coordinator-ingress'), None)

trino['is_ready'] = (trino_worker_dep and trino_worker_dep.status.ready_replicas
and trino_coordinator_stfs and trino_coordinator_stfs.status.ready_replicas)

if not trino['is_ready']:
LOG.info("Trino %s not ready" % namespace)

coordinator_url = 'http://trino-coordinator.%s.svc.cluster.local:8080' % namespace
settings = []

trino_coordinator_configs = core_v1.read_namespaced_config_map('trino-coordinator-config', namespace)
core_site_data = confparse.ConfParse(trino_coordinator_configs.data['core-site.xml'])
ldap_bin_user = core_site_data.get('hadoop.security.group.mapping.ldap.bind.user')
if ldap_bin_user:
ldap_user_regex = '.*uid=([^,]+).*'
match = re.search(ldap_user_regex, ldap_bin_user)
ldap_user_id = match.group(1) if match and match.group(1) else None
settings.append({"name": "auth_username", "value": ldap_user_id})
settings.append({"name": "auth_password_script", "value": "/etc/hue/conf/altscript.sh hue.binduser.password"})
if trino_coordinator_ingress and trino_coordinator_ingress.spec.rules:
coordinator_url = 'https://%s:443' % trino_coordinator_ingress.spec.rules[0].host

settings.append({"name": "url", "value": coordinator_url})

trino.update({
'dialect': 'trino',
'interface': 'trino',
# 'ldap_groups': ldap_groups.split(",") if ldap_groups else None,
'settings': json.dumps(settings)
})
2 changes: 1 addition & 1 deletion desktop/core/src/desktop/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1909,7 +1909,7 @@ def _get_editor(self):
'optimizer': get_optimizer_mode(),
'page': '/editor/?type=%(type)s' % interpreter,
'is_sql': interpreter['is_sql'],
'is_batchable': interpreter['dialect'] in ['hive', 'impala'] or interpreter['interface'] in ['oozie', 'sqlalchemy'],
'is_batchable': interpreter['dialect'] in ['hive', 'impala', 'trino'] or interpreter['interface'] in ['oozie', 'sqlalchemy'],
'dialect': interpreter['dialect'],
'dialect_properties': interpreter.get('dialect_properties'),
})
Expand Down
2 changes: 1 addition & 1 deletion desktop/libs/notebook/src/notebook/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def get_ordered_interpreters(user=None):
for interpreter in INTERPRETERS_CACHE:
if check_has_missing_permission(user, interpreter, user_apps=user_apps):
pass # Not allowed
elif has_computes and interpreter in ('hive', 'impala') and not computes_for_dialect(interpreter, user):
elif has_computes and interpreter in ('hive', 'impala', 'trino') and not computes_for_dialect(interpreter, user):
pass # No available computes for the dialect so skip
else:
user_interpreters.append(interpreter)
Expand Down
6 changes: 3 additions & 3 deletions desktop/libs/notebook/src/notebook/connectors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

from beeswax.common import find_compute, is_compute
from desktop.auth.backend import is_admin
from desktop.conf import TASK_SERVER, has_connectors
from desktop.conf import TASK_SERVER, has_connectors, is_cdw_compute_enabled
from desktop.lib import export_csvxls
from desktop.lib.exceptions_renderable import PopupException
from desktop.lib.i18n import smart_unicode
Expand Down Expand Up @@ -439,10 +439,10 @@ def get_api(request, snippet):
if has_connectors() and snippet.get('type') == 'hello' and is_admin(request.user):
LOG.debug('Using the interpreter from snippet')
interpreter = snippet.get('interpreter')
elif is_compute(snippet):
elif is_cdw_compute_enabled():
LOG.debug("Finding the compute from db using snippet: %s" % snippet)
interpreter = find_compute(cluster=snippet, user=request.user)
else:
if interpreter is None:
LOG.debug("Picking up the connectors from the configs using connector_name: %s" % connector_name)
interpreter = get_interpreter(connector_type=connector_name, user=request.user)

Expand Down

0 comments on commit b6432c8

Please sign in to comment.