Skip to content

Commit

Permalink
DWX-17085: added compute support for trino
Browse files Browse the repository at this point in the history
With this change, trino compute can be discovered, saved to db and used
similar to hive and impala.
  • Loading branch information
amitsrivastava committed Sep 5, 2024
1 parent 5ec331d commit e9e5195
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 9 deletions.
6 changes: 4 additions & 2 deletions apps/beeswax/src/beeswax/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@

RELATION_OPS = ['=', '<>', '<', '<=', '>', '>='] + RELATION_OPS_UNARY

COMPUTE_TYPES = ['hive-compute', 'impala-compute']
COMPUTE_TYPES = ['hive-compute', 'impala-compute', 'trino-compute', 'trino']

TERMINATORS = [
# (hive representation, description, ascii value)
Expand Down Expand Up @@ -135,7 +135,9 @@ def find_compute(cluster=None, user=None, dialect=None, namespace_id=None):
dialect = selected_compute['dialect'] if selected_compute.get('dialect') else dialect
if not dialect and cluster.get('type'):
t = cluster['type']
dialect = 'hive' if t.startswith('hive') else 'impala' if t.startswith('impala') else None
dialect = 'hive' if t.startswith('hive') else\
'impala' if t.startswith('impala') else\
'trino' if t.startswith('trino') else None

# We will attempt to find a default compute based on other criteria
ns = None
Expand Down
4 changes: 4 additions & 0 deletions desktop/core/src/desktop/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -2257,6 +2257,10 @@ def has_connectors():
'''When the connector feature is turned on'''
return ENABLE_CONNECTORS.get()

def is_cdw_compute_enabled():
'''When the computes feature is turned on'''
clusters = CLUSTERS.get()
return bool(clusters and [c for c in clusters.values() if c.TYPE.get() == 'cdw'])

CLUSTERS = UnspecifiedConfigSection(
"clusters",
Expand Down
51 changes: 49 additions & 2 deletions desktop/core/src/desktop/management/commands/sync_warehouses.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

core_v1 = client.CoreV1Api()
apps_v1 = client.AppsV1Api()
networking_v1 = client.NetworkingV1Api()

SERVER_HELP = r"""
Sync up the desktop_connectors with the available hive and impala warehouses
Expand All @@ -64,6 +65,7 @@ def sync_warehouses(args, options):

hives = [c for c in computes if c['dialect'] == 'hive']
impalas = [c for c in computes if c['dialect'] == 'impala']
trinos = [c for c in computes if c['dialect'] == 'trino']

(hive_warehouse, created) = models.Namespace.objects.get_or_create(
external_id="CDW_HIVE_WAREHOUSE",
Expand All @@ -75,6 +77,11 @@ def sync_warehouses(args, options):
defaults={'name': 'CDW Impala', 'description': 'CDW Impala Warehouse', 'dialect': 'impala', 'interface': 'hiveserver2'})
add_computes_to_warehouse(impala_warehouse, impalas)

(trino_warehouse, created) = models.Namespace.objects.get_or_create(
external_id="CDW_TRINO_WAREHOUSE",
defaults={'name': 'CDW Trino', 'description': 'CDW Trino Warehouse', 'dialect': 'trino', 'interface': 'trino'})
add_computes_to_warehouse(trino_warehouse, trinos)

LOG.info("Synced computes")
LOG.debug("Current computes %s" % models.Compute.objects.all())

Expand Down Expand Up @@ -104,7 +111,7 @@ def get_computes_from_k8s():
namespace = n.metadata.name
LOG.info('Getting details for ns: %s' % namespace)
item = {
'name': n.metadata.labels.get('displayname'),
'name': n.metadata.labels.get('displayname', namespace),
'description': '%s (%s)' % (n.metadata.labels.get('displayname'), n.metadata.name),
'external_id': namespace,
# 'creation_timestamp': n.metadata.labels.get('creation_timestamp'),
Expand All @@ -118,8 +125,11 @@ def get_computes_from_k8s():
elif namespace.startswith('impala-'):
populate_impala(namespace, item)
computes.append(item)
elif namespace.startswith('trino-'):
update_trino_configs(namespace, item)
computes.append(item)
except Exception as e:
LOG.exception('Could not get details for ns: %s' % n)
LOG.exception('Could not get details for ns: %s' % (n.metadata.name if n.metadata is not None else n))

return computes

Expand Down Expand Up @@ -224,3 +234,40 @@ def update_impala_configs(namespace, impala, host):
'ldap_groups': ldap_groups.split(",") if ldap_groups else None,
'settings': json.dumps(settings)
})

def update_trino_configs(namespace, trino):
deployments = apps_v1.list_namespaced_deployment(namespace).items
stfs = apps_v1.list_namespaced_stateful_set(namespace).items
ingresses = networking_v1.list_namespaced_ingress(namespace).items
trino_worker_dep = next((d for d in deployments if d.metadata.labels['app'] == 'trino' and d.metadata.labels['component'] == 'trino-worker'), None)
trino_coordinator_stfs = next((s for s in stfs if s.metadata.labels['app'] == 'trino-coordinator'), None)
trino_coordinator_ingress = next((i for i in ingresses if i.metadata.name == 'trino-coordinator-ingress'), None)

trino['is_ready'] = trino_worker_dep and trino_worker_dep.status.ready_replicas and trino_coordinator_stfs and trino_coordinator_stfs.status.ready_replicas

if not trino['is_ready']:
LOG.info("Trino %s not ready" % namespace)

coordinator_url = 'http://trino-coordinator.%s.svc.cluster.local:8080' % namespace
settings = []

trino_coordinator_configs = core_v1.read_namespaced_config_map('trino-coordinator-config', namespace)
core_site_data = confparse.ConfParse(trino_coordinator_configs.data['core-site.xml'])
ldap_bin_user = core_site_data.get('hadoop.security.group.mapping.ldap.bind.user')
if ldap_bin_user:
ldap_user_regex = '.*uid=([^,]+).*'
match = re.search(ldap_user_regex, ldap_bin_user)
ldap_user_id = match.group(1) if match and match.group(1) else None
settings.append({"name": "auth_username", "value": ldap_user_id})
settings.append({"name": "auth_password_script", "value": "/etc/hue/conf/altscript.sh hue.binduser.password"})
if trino_coordinator_ingress and trino_coordinator_ingress.spec.rules:
coordinator_url = 'https://%s:443' % trino_coordinator_ingress.spec.rules[0].host

settings.append({"name": "url", "value": coordinator_url})

trino.update({
'dialect': 'trino',
'interface': 'trino',
#'ldap_groups': ldap_groups.split(",") if ldap_groups else None,
'settings': json.dumps(settings)
})
2 changes: 1 addition & 1 deletion desktop/core/src/desktop/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1909,7 +1909,7 @@ def _get_editor(self):
'optimizer': get_optimizer_mode(),
'page': '/editor/?type=%(type)s' % interpreter,
'is_sql': interpreter['is_sql'],
'is_batchable': interpreter['dialect'] in ['hive', 'impala'] or interpreter['interface'] in ['oozie', 'sqlalchemy'],
'is_batchable': interpreter['dialect'] in ['hive', 'impala', 'trino'] or interpreter['interface'] in ['oozie', 'sqlalchemy'],
'dialect': interpreter['dialect'],
'dialect_properties': interpreter.get('dialect_properties'),
})
Expand Down
2 changes: 1 addition & 1 deletion desktop/libs/notebook/src/notebook/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def get_ordered_interpreters(user=None):
for interpreter in INTERPRETERS_CACHE:
if check_has_missing_permission(user, interpreter, user_apps=user_apps):
pass # Not allowed
elif has_computes and interpreter in ('hive', 'impala') and not computes_for_dialect(interpreter, user):
elif has_computes and interpreter in ('hive', 'impala', 'trino') and not computes_for_dialect(interpreter, user):
pass # No available computes for the dialect so skip
else:
user_interpreters.append(interpreter)
Expand Down
6 changes: 3 additions & 3 deletions desktop/libs/notebook/src/notebook/connectors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

from beeswax.common import find_compute, is_compute
from desktop.auth.backend import is_admin
from desktop.conf import TASK_SERVER, has_connectors
from desktop.conf import TASK_SERVER, has_connectors, is_cdw_compute_enabled
from desktop.lib import export_csvxls
from desktop.lib.exceptions_renderable import PopupException
from desktop.lib.i18n import smart_unicode
Expand Down Expand Up @@ -439,10 +439,10 @@ def get_api(request, snippet):
if has_connectors() and snippet.get('type') == 'hello' and is_admin(request.user):
LOG.debug('Using the interpreter from snippet')
interpreter = snippet.get('interpreter')
elif is_compute(snippet):
elif is_cdw_compute_enabled():
LOG.debug("Finding the compute from db using snippet: %s" % snippet)
interpreter = find_compute(cluster=snippet, user=request.user)
else:
if interpreter is None:
LOG.debug("Picking up the connectors from the configs using connector_name: %s" % connector_name)
interpreter = get_interpreter(connector_type=connector_name, user=request.user)

Expand Down

0 comments on commit e9e5195

Please sign in to comment.