From 75dfdfb08c3377ad65480c3526eded704d1f1025 Mon Sep 17 00:00:00 2001 From: Dmitry Erokhin Date: Wed, 31 Jan 2024 00:02:23 +0100 Subject: [PATCH] Simplified NakadiSQL to use PUT --- clin/clients/nakadi_sql.py | 24 +++------ clin/processor.py | 99 ++++++++++++-------------------------- 2 files changed, 37 insertions(+), 86 deletions(-) diff --git a/clin/clients/nakadi_sql.py b/clin/clients/nakadi_sql.py index 4cf7196..7a4285e 100644 --- a/clin/clients/nakadi_sql.py +++ b/clin/clients/nakadi_sql.py @@ -5,7 +5,6 @@ from clin.clients.nakadi import NakadiError from clin.clients.http_client import HttpClient, ro_auth_from_payload, auth_to_payload -from clin.models.auth import Auth from clin.models.event_type import EventType from clin.models.sql_query import SqlQuery, OutputEventType from clin.models.shared import Category, Cleanup, Audience, Partitioning @@ -25,31 +24,20 @@ def get_sql_query(self, event_type: EventType) -> Optional[SqlQuery]: f"Nakadi error trying to get sql query '{event_type}'", e.response ) - def create_sql_query(self, sql_query: SqlQuery): - resp = self._post("queries", data=json.dumps(sql_query_to_payload(sql_query))) + def create_sql_query(self, query: SqlQuery): + resp = self._post("queries", data=json.dumps(sql_query_to_payload(query))) if resp.status_code != 201: raise NakadiError( - f"Nakadi error during creation of sql query '{sql_query.name}'", resp + f"Nakadi error during creation of sql query '{query.name}'", resp ) - def update_sql_query_auth(self, query_name: str, auth: Auth): - resp = self._put( - f"queries/{query_name}/authorization", - data=json.dumps(auth_to_payload(auth)), - ) + def update_sql_query(self, query: SqlQuery): + resp = self._put(f"queries/{query.name}", data=json.dumps(sql_query_to_payload(query))) if resp.status_code != 200: raise NakadiError( - f"Nakadi error during updating of sql query '{query_name}'", resp + f"Nakadi error during updating sql query '{query.name}'", resp ) - def update_sql_query_sql(self, query_name: str, sql: str): - resp = self._put(f"queries/{query_name}/sql", data=json.dumps({"sql": sql})) - if resp.status_code != 204: - raise NakadiError( - f"Nakadi error during updating of sql query '{query_name}'", resp - ) - - def output_event_type_from_payload( event_type: EventType, payload: dict ) -> OutputEventType: diff --git a/clin/processor.py b/clin/processor.py index 72b0ac4..40c63b4 100644 --- a/clin/processor.py +++ b/clin/processor.py @@ -25,16 +25,20 @@ ERROR_COLOR = Fore.RED UP_TO_DATE_COLOR = Fore.GREEN OUTPUT_INDENTATION = 4 +ALLOWED_SQL_CHANGE_PATHS_PREFIXES = [ + "root.auth", + "root.output_event_type.annotations", + "root.sql"] class Processor: def __init__( - self, - config: AppConfig, - token: Optional[str], - execute: bool = False, - show_diff: bool = False, - show_payload: bool = False, + self, + config: AppConfig, + token: Optional[str], + execute: bool = False, + show_diff: bool = False, + show_payload: bool = False, ): self.apply_func_per_kind: Dict[Kind, Callable[[str, dict], None]] = { Kind.EVENT_TYPE: self.apply_event_type, @@ -84,7 +88,7 @@ def apply_sql_query(self, env: str, spec: dict): nakadi_sql = self._get_nakadi_sql(env) query = SqlQuery.from_spec(spec) - def get_changed_paths(diff: dict): + def is_allowed_change(diff: dict): changed_keys = itertools.chain( diff.get("values_changed", {}).keys(), diff.get("iterable_item_removed", {}).keys(), @@ -93,62 +97,32 @@ def get_changed_paths(diff: dict): diff.get("dictionary_item_removed", []), ) - return set(changed_keys) + for key in changed_keys: + if not any(key.startswith(prefix) for prefix in ALLOWED_SQL_CHANGE_PATHS_PREFIXES): + return False + + return True try: current_et = nakadi.get_event_type(query.name) current = nakadi_sql.get_sql_query(current_et) if current_et else None if current: logging.debug("Found existing %s", query) - - diff = DeepDiff( - current, query, ignore_order=True, report_repetition=True - ) - - changed_paths = get_changed_paths(diff) - - # TODO: use put on sql endpoints for auth, sql and annotations - # TODO: prohibit changes that are not taking effect - + diff = DeepDiff(current, query, ignore_order=True, report_repetition=True) if diff: self._maybe_print_diff(query, diff) - - changed_auth = [p for p in changed_paths if p.startswith("root.auth")] - changed_sql = [p for p in changed_paths if p.startswith("root.sql")] - changed_ann = [p for p in changed_paths if p.startswith("root.output_event_type.annotations")] - - if not (changed_auth or changed_sql or changed_ann): - logging.info( - f"{ERROR_COLOR}× Modifying is forbidden:{Fore.RESET} %s", - query, - ) - else: + if is_allowed_change(diff): self._maybe_print_payload(query) - if changed_auth: - self._update_sql_query_auth(nakadi_sql, query) - if changed_sql: - self._update_sql_query_sql(nakadi_sql, query) - if changed_ann: - print("Will update annotations") - # if self.execute: - # nakadi_sql.update_sql_query_sql(query.name, query.sql) - # logging.info(f"{MODIFY_COLOR}⦿ Updated:{Fore.RESET} %s query", query) - # else: - # logging.info(f"{MODIFY_COLOR}⦿ Will update:{Fore.RESET} %s query", query) - # - - - # self._update_annotations(nakadi, current_et) - + self._update_sql_query(nakadi_sql, query) + else: + logging.info(f"{ERROR_COLOR}× Modifying is forbidden:{Fore.RESET} %s", query) else: - logging.info( - f"{UP_TO_DATE_COLOR}✔ Up to date:{Fore.RESET} %s", query - ) + logging.info(f"{UP_TO_DATE_COLOR}✔ Up to date:{Fore.RESET} %s", query) else: logging.debug("Not found existing %s", query) self._maybe_print_payload(query) - self._create_sql_query(nakadi, nakadi_sql, query) + self._create_sql_query(nakadi_sql, query) except NakadiError as err: raise ProcessingError(f"Can not process {query}: {err}") from err @@ -253,31 +227,20 @@ def _create_subscription(self, nakadi: Nakadi, sub: Subscription): else: logging.info(f"{MODIFY_COLOR}⦿ Will create:{Fore.RESET} %s", sub) - def _update_sql_query_auth(self, nakadi_sql: NakadiSql, query: SqlQuery): - if self.execute: - nakadi_sql.update_sql_query_auth(query.name, query.auth) - logging.info( - f"{MODIFY_COLOR}⦿ Updated:{Fore.RESET} %s authentication", query - ) - else: - logging.info( - f"{MODIFY_COLOR}⦿ Will update:{Fore.RESET} %s authentication", query - ) - - def _update_sql_query_sql(self, nakadi_sql: NakadiSql, query: SqlQuery): - if self.execute: - nakadi_sql.update_sql_query_sql(query.name, query.sql) - logging.info(f"{MODIFY_COLOR}⦿ Updated:{Fore.RESET} %s query", query) - else: - logging.info(f"{MODIFY_COLOR}⦿ Will update:{Fore.RESET} %s query", query) - - def _create_sql_query(self, nakadi: Nakadi, nakadi_sql: NakadiSql, query: SqlQuery): + def _create_sql_query(self, nakadi_sql: NakadiSql, query: SqlQuery): if self.execute: nakadi_sql.create_sql_query(query) logging.info(f"{MODIFY_COLOR}⦿ Created:{Fore.RESET} %s", query) else: logging.info(f"{MODIFY_COLOR}⦿ Will create:{Fore.RESET} %s", query) + def _update_sql_query(self, nakadi_sql: NakadiSql, query: SqlQuery): + if self.execute: + nakadi_sql.update_sql_query(query) + logging.info(f"{MODIFY_COLOR}⦿ Updated:{Fore.RESET} %s", query) + else: + logging.info(f"{MODIFY_COLOR}⦿ Will update:{Fore.RESET} %s", query) + def _get_nakadi(self, env: str) -> Nakadi: if env not in self.config.environments: raise ProcessingError(f"Unknown environment: {env}")