Skip to content

Commit

Permalink
Simplified NakadiSQL to use PUT
Browse files Browse the repository at this point in the history
  • Loading branch information
Dmitry-Erokhin committed Jan 30, 2024
1 parent 3b1b798 commit 75dfdfb
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 86 deletions.
24 changes: 6 additions & 18 deletions clin/clients/nakadi_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

from clin.clients.nakadi import NakadiError
from clin.clients.http_client import HttpClient, ro_auth_from_payload, auth_to_payload
from clin.models.auth import Auth
from clin.models.event_type import EventType
from clin.models.sql_query import SqlQuery, OutputEventType
from clin.models.shared import Category, Cleanup, Audience, Partitioning
Expand All @@ -25,31 +24,20 @@ def get_sql_query(self, event_type: EventType) -> Optional[SqlQuery]:
f"Nakadi error trying to get sql query '{event_type}'", e.response
)

def create_sql_query(self, sql_query: SqlQuery):
resp = self._post("queries", data=json.dumps(sql_query_to_payload(sql_query)))
def create_sql_query(self, query: SqlQuery):
resp = self._post("queries", data=json.dumps(sql_query_to_payload(query)))
if resp.status_code != 201:
raise NakadiError(
f"Nakadi error during creation of sql query '{sql_query.name}'", resp
f"Nakadi error during creation of sql query '{query.name}'", resp
)

def update_sql_query_auth(self, query_name: str, auth: Auth):
resp = self._put(
f"queries/{query_name}/authorization",
data=json.dumps(auth_to_payload(auth)),
)
def update_sql_query(self, query: SqlQuery):
resp = self._put(f"queries/{query.name}", data=json.dumps(sql_query_to_payload(query)))
if resp.status_code != 200:
raise NakadiError(
f"Nakadi error during updating of sql query '{query_name}'", resp
f"Nakadi error during updating sql query '{query.name}'", resp
)

def update_sql_query_sql(self, query_name: str, sql: str):
resp = self._put(f"queries/{query_name}/sql", data=json.dumps({"sql": sql}))
if resp.status_code != 204:
raise NakadiError(
f"Nakadi error during updating of sql query '{query_name}'", resp
)


def output_event_type_from_payload(
event_type: EventType, payload: dict
) -> OutputEventType:
Expand Down
99 changes: 31 additions & 68 deletions clin/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,20 @@
ERROR_COLOR = Fore.RED
UP_TO_DATE_COLOR = Fore.GREEN
OUTPUT_INDENTATION = 4
ALLOWED_SQL_CHANGE_PATHS_PREFIXES = [
"root.auth",
"root.output_event_type.annotations",
"root.sql"]


class Processor:
def __init__(
self,
config: AppConfig,
token: Optional[str],
execute: bool = False,
show_diff: bool = False,
show_payload: bool = False,
self,
config: AppConfig,
token: Optional[str],
execute: bool = False,
show_diff: bool = False,
show_payload: bool = False,
):
self.apply_func_per_kind: Dict[Kind, Callable[[str, dict], None]] = {
Kind.EVENT_TYPE: self.apply_event_type,
Expand Down Expand Up @@ -84,7 +88,7 @@ def apply_sql_query(self, env: str, spec: dict):
nakadi_sql = self._get_nakadi_sql(env)
query = SqlQuery.from_spec(spec)

def get_changed_paths(diff: dict):
def is_allowed_change(diff: dict):
changed_keys = itertools.chain(
diff.get("values_changed", {}).keys(),
diff.get("iterable_item_removed", {}).keys(),
Expand All @@ -93,62 +97,32 @@ def get_changed_paths(diff: dict):
diff.get("dictionary_item_removed", []),
)

return set(changed_keys)
for key in changed_keys:
if not any(key.startswith(prefix) for prefix in ALLOWED_SQL_CHANGE_PATHS_PREFIXES):
return False

return True

try:
current_et = nakadi.get_event_type(query.name)
current = nakadi_sql.get_sql_query(current_et) if current_et else None
if current:
logging.debug("Found existing %s", query)

diff = DeepDiff(
current, query, ignore_order=True, report_repetition=True
)

changed_paths = get_changed_paths(diff)

# TODO: use put on sql endpoints for auth, sql and annotations
# TODO: prohibit changes that are not taking effect

diff = DeepDiff(current, query, ignore_order=True, report_repetition=True)
if diff:
self._maybe_print_diff(query, diff)

changed_auth = [p for p in changed_paths if p.startswith("root.auth")]
changed_sql = [p for p in changed_paths if p.startswith("root.sql")]
changed_ann = [p for p in changed_paths if p.startswith("root.output_event_type.annotations")]

if not (changed_auth or changed_sql or changed_ann):
logging.info(
f"{ERROR_COLOR}× Modifying is forbidden:{Fore.RESET} %s",
query,
)
else:
if is_allowed_change(diff):
self._maybe_print_payload(query)
if changed_auth:
self._update_sql_query_auth(nakadi_sql, query)
if changed_sql:
self._update_sql_query_sql(nakadi_sql, query)
if changed_ann:
print("Will update annotations")
# if self.execute:
# nakadi_sql.update_sql_query_sql(query.name, query.sql)
# logging.info(f"{MODIFY_COLOR}⦿ Updated:{Fore.RESET} %s query", query)
# else:
# logging.info(f"{MODIFY_COLOR}⦿ Will update:{Fore.RESET} %s query", query)
#


# self._update_annotations(nakadi, current_et)

self._update_sql_query(nakadi_sql, query)
else:
logging.info(f"{ERROR_COLOR}× Modifying is forbidden:{Fore.RESET} %s", query)
else:
logging.info(
f"{UP_TO_DATE_COLOR}✔ Up to date:{Fore.RESET} %s", query
)
logging.info(f"{UP_TO_DATE_COLOR}✔ Up to date:{Fore.RESET} %s", query)

else:
logging.debug("Not found existing %s", query)
self._maybe_print_payload(query)
self._create_sql_query(nakadi, nakadi_sql, query)
self._create_sql_query(nakadi_sql, query)

except NakadiError as err:
raise ProcessingError(f"Can not process {query}: {err}") from err
Expand Down Expand Up @@ -253,31 +227,20 @@ def _create_subscription(self, nakadi: Nakadi, sub: Subscription):
else:
logging.info(f"{MODIFY_COLOR}⦿ Will create:{Fore.RESET} %s", sub)

def _update_sql_query_auth(self, nakadi_sql: NakadiSql, query: SqlQuery):
if self.execute:
nakadi_sql.update_sql_query_auth(query.name, query.auth)
logging.info(
f"{MODIFY_COLOR}⦿ Updated:{Fore.RESET} %s authentication", query
)
else:
logging.info(
f"{MODIFY_COLOR}⦿ Will update:{Fore.RESET} %s authentication", query
)

def _update_sql_query_sql(self, nakadi_sql: NakadiSql, query: SqlQuery):
if self.execute:
nakadi_sql.update_sql_query_sql(query.name, query.sql)
logging.info(f"{MODIFY_COLOR}⦿ Updated:{Fore.RESET} %s query", query)
else:
logging.info(f"{MODIFY_COLOR}⦿ Will update:{Fore.RESET} %s query", query)

def _create_sql_query(self, nakadi: Nakadi, nakadi_sql: NakadiSql, query: SqlQuery):
def _create_sql_query(self, nakadi_sql: NakadiSql, query: SqlQuery):
if self.execute:
nakadi_sql.create_sql_query(query)
logging.info(f"{MODIFY_COLOR}⦿ Created:{Fore.RESET} %s", query)
else:
logging.info(f"{MODIFY_COLOR}⦿ Will create:{Fore.RESET} %s", query)

def _update_sql_query(self, nakadi_sql: NakadiSql, query: SqlQuery):
if self.execute:
nakadi_sql.update_sql_query(query)
logging.info(f"{MODIFY_COLOR}⦿ Updated:{Fore.RESET} %s", query)
else:
logging.info(f"{MODIFY_COLOR}⦿ Will update:{Fore.RESET} %s", query)

def _get_nakadi(self, env: str) -> Nakadi:
if env not in self.config.environments:
raise ProcessingError(f"Unknown environment: {env}")
Expand Down

0 comments on commit 75dfdfb

Please sign in to comment.