diff --git a/conda/dev-environment-unix.yml b/conda/dev-environment-unix.yml index fa3740a5b..d6ad291c8 100644 --- a/conda/dev-environment-unix.yml +++ b/conda/dev-environment-unix.yml @@ -45,7 +45,6 @@ dependencies: - ruamel.yaml - ruff>=0.3,<0.4 - scikit-build - - slack-sdk - sqlalchemy - tar - threadpoolctl diff --git a/conda/dev-environment-win.yml b/conda/dev-environment-win.yml index 8ca2482d8..8c1ce5fdb 100644 --- a/conda/dev-environment-win.yml +++ b/conda/dev-environment-win.yml @@ -44,7 +44,6 @@ dependencies: - ruamel.yaml - ruff>=0.3,<0.4 - scikit-build - - slack-sdk - sqlalchemy - threadpoolctl - tornado diff --git a/csp/adapters/slack.py b/csp/adapters/slack.py index 0998e6983..6838504b4 100644 --- a/csp/adapters/slack.py +++ b/csp/adapters/slack.py @@ -1,372 +1,4 @@ -import threading -from logging import getLogger -from queue import Queue -from ssl import SSLContext -from threading import Thread -from time import sleep -from typing import Dict, List, Optional, TypeVar - -import csp -from csp.impl.adaptermanager import AdapterManagerImpl -from csp.impl.outputadapter import OutputAdapter -from csp.impl.pushadapter import PushInputAdapter -from csp.impl.struct import Struct -from csp.impl.types.tstype import ts -from csp.impl.wiring import py_output_adapter_def, py_push_adapter_def - try: - from slack_sdk.errors import SlackApiError - from slack_sdk.socket_mode import SocketModeClient - from slack_sdk.socket_mode.request import SocketModeRequest - from slack_sdk.socket_mode.response import SocketModeResponse - from slack_sdk.web import WebClient - - _HAVE_SLACK_SDK = True + from csp_adapter_slack import * # noqa: F403 except ImportError: - _HAVE_SLACK_SDK = False - -T = TypeVar("T") -log = getLogger(__file__) - - -__all__ = ("SlackMessage", "mention_user", "SlackAdapterManager", "SlackInputAdapterImpl", "SlackOutputAdapterImpl") - - -class SlackMessage(Struct): - user: str - user_email: str # email of the author - user_id: str # user id of the author - tags: List[str] # list of mentions - - channel: str # name of channel - channel_id: str # id of channel - channel_type: str # type of channel, in "message", "public" (app_mention), "private" (app_mention) - - msg: str # parsed text payload - reaction: str # emoji reacts - thread: str # thread id, if in thread - payload: dict # raw message payload - - -def mention_user(userid: str) -> str: - """Convenience method, more difficult to do in symphony but we want slack to be symmetric""" - return f"<@{userid}>" - - -class SlackAdapterManager(AdapterManagerImpl): - def __init__(self, app_token: str, bot_token: str, ssl: Optional[SSLContext] = None): - if not _HAVE_SLACK_SDK: - raise RuntimeError("Could not find slack-sdk installation") - if not app_token.startswith("xapp-") or not bot_token.startswith("xoxb-"): - raise RuntimeError("Slack app token or bot token looks malformed") - - self._slack_client = SocketModeClient( - app_token=app_token, - web_client=WebClient(token=bot_token, ssl=ssl), - ) - self._slack_client.socket_mode_request_listeners.append(self._process_slack_message) - - # down stream edges - self._subscribers = [] - self._publishers = [] - - # message queues - self._inqueue: Queue[SlackMessage] = Queue() - self._outqueue: Queue[SlackMessage] = Queue() - - # handler thread - self._running: bool = False - self._thread: Thread = None - - # lookups for mentions and redirection - self._room_id_to_room_name: Dict[str, str] = {} - self._room_id_to_room_type: Dict[str, str] = {} - self._room_name_to_room_id: Dict[str, str] = {} - self._user_id_to_user_name: Dict[str, str] = {} - self._user_id_to_user_email: Dict[str, str] = {} - self._user_name_to_user_id: Dict[str, str] = {} - self._user_email_to_user_id: Dict[str, str] = {} - - def subscribe(self): - return _slack_input_adapter(self, push_mode=csp.PushMode.NON_COLLAPSING) - - def publish(self, msg: ts[SlackMessage]): - return _slack_output_adapter(self, msg) - - def _create(self, engine, memo): - # We'll avoid having a second class and make our AdapterManager and AdapterManagerImpl the same - super().__init__(engine) - return self - - def start(self, starttime, endtime): - self._running = True - self._thread = threading.Thread(target=self._run, daemon=True) - self._thread.start() - - def stop(self): - if self._running: - self._running = False - self._slack_client.close() - self._thread.join() - - def register_subscriber(self, adapter): - if adapter not in self._subscribers: - self._subscribers.append(adapter) - - def register_publisher(self, adapter): - if adapter not in self._publishers: - self._publishers.append(adapter) - - def _get_user_from_id(self, user_id): - # try to pull from cache - name = self._user_id_to_user_name.get(user_id, None) - email = self._user_id_to_user_email.get(user_id, None) - - # if none, refresh data via web client - if name is None or email is None: - ret = self._slack_client.web_client.users_info(user=user_id) - if ret.status_code == 200: - # TODO OAuth scopes required - name = ret.data["user"]["profile"].get("real_name_normalized", ret.data["user"]["name"]) - email = ret.data["user"]["profile"]["email"] - self._user_id_to_user_name[user_id] = name - self._user_name_to_user_id[name] = user_id # TODO is this 1-1 in slack? - self._user_id_to_user_email[user_id] = email - self._user_email_to_user_id[email] = user_id - return name, email - - def _get_user_from_name(self, user_name): - # try to pull from cache - user_id = self._user_name_to_user_id.get(user_name, None) - - # if none, refresh data via web client - if user_id is None: - # unfortunately the reverse lookup is not super nice... - # we need to pull all users and build the reverse mapping - ret = self._slack_client.web_client.users_list() - if ret.status_code == 200: - # TODO OAuth scopes required - for user in ret.data["members"]: - name = user["profile"].get("real_name_normalized", user["name"]) - user_id = user["profile"]["id"] - email = user["profile"]["email"] - self._user_id_to_user_name[user_id] = name - self._user_name_to_user_id[name] = user_id # TODO is this 1-1 in slack? - self._user_id_to_user_email[user_id] = email - self._user_email_to_user_id[email] = user_id - return self._user_name_to_user_id.get(user_name, None) - return user_id - - def _channel_data_to_channel_kind(self, data) -> str: - if data.get("is_im", False): - return "message" - if data.get("is_private", False): - return "private" - return "public" - - def _get_channel_from_id(self, channel_id): - # try to pull from cache - name = self._room_id_to_room_name.get(channel_id, None) - kind = self._room_id_to_room_type.get(channel_id, None) - - # if none, refresh data via web client - if name is None: - ret = self._slack_client.web_client.conversations_info(channel=channel_id) - if ret.status_code == 200: - # TODO OAuth scopes required - kind = self._channel_data_to_channel_kind(ret.data["channel"]) - if kind == "message": - # TODO use same behavior as symphony adapter - name = "DM" - else: - name = ret.data["channel"]["name"] - - self._room_id_to_room_name[channel_id] = name - self._room_name_to_room_id[name] = channel_id - self._room_id_to_room_type[channel_id] = kind - return name, kind - - def _get_channel_from_name(self, channel_name): - # try to pull from cache - channel_id = self._room_name_to_room_id.get(channel_name, None) - - # if none, refresh data via web client - if channel_id is None: - # unfortunately the reverse lookup is not super nice... - # we need to pull all channels and build the reverse mapping - ret = self._slack_client.web_client.conversations_list() - if ret.status_code == 200: - # TODO OAuth scopes required - for channel in ret.data["channels"]: - name = channel["name"] - channel_id = channel["id"] - kind = self._channel_data_to_channel_kind(channel) - self._room_id_to_room_name[channel_id] = name - self._room_name_to_room_id[name] = channel_id - self._room_id_to_room_type[channel_id] = kind - return self._room_name_to_room_id.get(channel_name, None) - return channel_id - - def _get_tags_from_message(self, blocks, authorizations=None) -> List[str]: - """extract tags from message, potentially excluding the bot's own @""" - authorizations = authorizations or [] - if len(authorizations) > 0: - bot_id = authorizations[0]["user_id"] # TODO more than one? - else: - bot_id = "" - - tags = [] - to_search = blocks.copy() - - while to_search: - element = to_search.pop() - # add subsections - if element.get("elements", []): - to_search.extend(element.get("elements")) - - if element.get("type", "") == "user": - tag_id = element.get("user_id") - if tag_id != bot_id: - # TODO tag with id or with name? - name, _ = self._get_user_from_id(tag_id) - if name: - tags.append(name) - return tags - - def _process_slack_message(self, client: SocketModeClient, req: SocketModeRequest): - log.info(req.payload) - if req.type == "events_api": - # Acknowledge the request anyway - response = SocketModeResponse(envelope_id=req.envelope_id) - client.send_socket_mode_response(response) - - if ( - req.payload["event"]["type"] in ("message", "app_mention") - and req.payload["event"].get("subtype") is None - ): - user, user_email = self._get_user_from_id(req.payload["event"]["user"]) - channel, channel_type = self._get_channel_from_id(req.payload["event"]["channel"]) - tags = self._get_tags_from_message(req.payload["event"]["blocks"], req.payload["authorizations"]) - slack_msg = SlackMessage( - user=user or "", - user_email=user_email or "", - user_id=req.payload["event"]["user"], - tags=tags, - channel=channel or "", - channel_id=req.payload["event"]["channel"], - channel_type=channel_type or "", - msg=req.payload["event"]["text"], - reaction="", - thread=req.payload["event"]["ts"], - payload=req.payload.copy(), - ) - self._inqueue.put(slack_msg) - - def _run(self): - self._slack_client.connect() - - while self._running: - # drain outbound - while not self._outqueue.empty(): - # pull SlackMessage from queue - slack_msg = self._outqueue.get() - - # refactor into slack command - # grab channel or DM - if hasattr(slack_msg, "channel_id") and slack_msg.channel_id: - channel_id = slack_msg.channel_id - elif hasattr(slack_msg, "channel") and slack_msg.channel: - # TODO DM - channel_id = self._get_channel_from_name(slack_msg.channel) - - # pull text or reaction - if ( - hasattr(slack_msg, "reaction") - and slack_msg.reaction - and hasattr(slack_msg, "thread") - and slack_msg.thread - ): - # TODO - self._slack_client.web_client.reactions_add( - channel=channel_id, - name=slack_msg.reaction, - timestamp=slack_msg.thread, - ) - elif hasattr(slack_msg, "msg") and slack_msg.msg: - try: - # send text to channel - self._slack_client.web_client.chat_postMessage( - channel=channel_id, - text=getattr(slack_msg, "msg", ""), - ) - except SlackApiError: - # TODO - ... - else: - # cannot send empty message, log an error - log.error(f"Received malformed SlackMessage instance: {slack_msg}") - - if not self._inqueue.empty(): - # pull all SlackMessages from queue - # do as burst to match SymphonyAdapter - slack_msgs = [] - while not self._inqueue.empty(): - slack_msgs.append(self._inqueue.get()) - - # push to all the subscribers - for adapter in self._subscribers: - adapter.push_tick(slack_msgs) - - # do short sleep - sleep(0.1) - - # liveness check - if not self._thread.is_alive(): - self._running = False - self._thread.join() - - # shut down socket client - try: - # TODO which one? - self._slack_client.close() - # self._slack_client.disconnect() - except AttributeError: - # TODO bug in slack sdk causes an exception to be thrown - # File "slack_sdk/socket_mode/builtin/connection.py", line 191, in disconnect - # self.sock.close() - # ^^^^^^^^^^^^^^^ - # AttributeError: 'NoneType' object has no attribute 'close' - ... - - def _on_tick(self, value): - self._outqueue.put(value) - - -class SlackInputAdapterImpl(PushInputAdapter): - def __init__(self, manager): - manager.register_subscriber(self) - super().__init__() - - -class SlackOutputAdapterImpl(OutputAdapter): - def __init__(self, manager): - manager.register_publisher(self) - self._manager = manager - super().__init__() - - def on_tick(self, time, value): - self._manager._on_tick(value) - - -_slack_input_adapter = py_push_adapter_def( - name="SlackInputAdapter", - adapterimpl=SlackInputAdapterImpl, - out_type=ts[List[SlackMessage]], - manager_type=SlackAdapterManager, -) -_slack_output_adapter = py_output_adapter_def( - name="SlackOutputAdapter", - adapterimpl=SlackOutputAdapterImpl, - manager_type=SlackAdapterManager, - input=ts[SlackMessage], -) + raise ModuleNotFoundError("Install `csp-adapter-slack` to use csp's Slack adapter") diff --git a/csp/tests/adapters/test_slack.py b/csp/tests/adapters/test_slack.py deleted file mode 100644 index a05feb892..000000000 --- a/csp/tests/adapters/test_slack.py +++ /dev/null @@ -1,231 +0,0 @@ -import pytest -from datetime import timedelta -from ssl import create_default_context -from unittest.mock import MagicMock, call, patch - -import csp -from csp import ts -from csp.adapters.slack import SlackAdapterManager, SlackMessage, mention_user - - -@csp.node -def hello(msg: ts[SlackMessage]) -> ts[SlackMessage]: - if csp.ticked(msg): - text = f"Hello <@{msg.user_id}>!" - return SlackMessage( - channel="a new channel", - # reply in thread - thread=msg.thread, - msg=text, - ) - - -@csp.node -def react(msg: ts[SlackMessage]) -> ts[SlackMessage]: - if csp.ticked(msg): - return SlackMessage( - channel=msg.channel, - channel_id=msg.channel_id, - thread=msg.thread, - reaction="eyes", - ) - - -@csp.node -def send_fake_message(clientmock: MagicMock, requestmock: MagicMock, am: SlackAdapterManager) -> ts[bool]: - with csp.alarms(): - a_send = csp.alarm(bool) - with csp.start(): - csp.schedule_alarm(a_send, timedelta(seconds=1), True) - if csp.ticked(a_send): - if a_send: - am._process_slack_message(clientmock, requestmock) - csp.schedule_alarm(a_send, timedelta(seconds=1), False) - else: - return True - - -PUBLIC_CHANNEL_MENTION_PAYLOAD = { - "token": "ABCD", - "team_id": "EFGH", - "api_app_id": "HIJK", - "event": { - "client_msg_id": "1234-5678", - "type": "app_mention", - "text": "<@BOTID> <@USERID> <@USERID2>", - "user": "USERID", - "ts": "1.2", - "blocks": [ - { - "type": "rich_text", - "block_id": "tx381", - "elements": [ - { - "type": "rich_text_section", - "elements": [ - {"type": "user", "user_id": "BOTID"}, - {"type": "text", "text": " "}, - {"type": "user", "user_id": "USERID"}, - {"type": "text", "text": " "}, - {"type": "user", "user_id": "USERID2"}, - ], - } - ], - } - ], - "team": "ABCD", - "channel": "EFGH", - "event_ts": "1.2", - }, - "type": "event_callback", - "event_id": "ABCD", - "event_time": 1707423091, - "authorizations": [ - {"enterprise_id": None, "team_id": "ABCD", "user_id": "BOTID", "is_bot": True, "is_enterprise_install": False} - ], - "is_ext_shared_channel": False, - "event_context": "SOMELONGCONTEXT", -} -DIRECT_MESSAGE_PAYLOAD = { - "token": "ABCD", - "team_id": "EFGH", - "context_team_id": "ABCD", - "context_enterprise_id": None, - "api_app_id": "HIJK", - "event": { - "client_msg_id": "1234-5678", - "type": "message", - "text": "test", - "user": "USERID", - "ts": "2.1", - "blocks": [ - { - "type": "rich_text", - "block_id": "gB9fq", - "elements": [{"type": "rich_text_section", "elements": [{"type": "text", "text": "test"}]}], - } - ], - "team": "ABCD", - "channel": "EFGH", - "event_ts": "2.1", - "channel_type": "im", - }, - "type": "event_callback", - "event_id": "ABCD", - "event_time": 1707423220, - "authorizations": [ - {"enterprise_id": None, "team_id": "ABCD", "user_id": "BOTID", "is_bot": True, "is_enterprise_install": False} - ], - "is_ext_shared_channel": False, - "event_context": "SOMELONGCONTEXT", -} - - -class TestSlack: - def test_slack_tokens(self): - with pytest.raises(RuntimeError): - SlackAdapterManager("abc", "def") - - @pytest.mark.parametrize("payload", (PUBLIC_CHANNEL_MENTION_PAYLOAD, DIRECT_MESSAGE_PAYLOAD)) - def test_slack(self, payload): - with patch("csp.adapters.slack.SocketModeClient") as clientmock: - # mock out the event from the slack sdk - reqmock = MagicMock() - reqmock.type = "events_api" - reqmock.payload = payload - - # mock out the user/room lookup responses - mock_user_response = MagicMock(name="users_info_mock") - mock_user_response.status_code = 200 - mock_user_response.data = { - "user": {"profile": {"real_name_normalized": "johndoe", "email": "johndoe@some.email"}, "name": "blerg"} - } - clientmock.return_value.web_client.users_info.return_value = mock_user_response - mock_room_response = MagicMock(name="conversations_info_mock") - mock_room_response.status_code = 200 - mock_room_response.data = {"channel": {"is_im": False, "is_private": True, "name": "a private channel"}} - clientmock.return_value.web_client.conversations_info.return_value = mock_room_response - mock_list_response = MagicMock(name="conversations_list_mock") - mock_list_response.status_code = 200 - mock_list_response.data = { - "channels": [ - {"name": "a private channel", "id": "EFGH"}, - {"name": "a new channel", "id": "new_channel"}, - ] - } - clientmock.return_value.web_client.conversations_list.return_value = mock_list_response - - def graph(): - am = SlackAdapterManager("xapp-1-dummy", "xoxb-dummy", ssl=create_default_context()) - - # send a fake slack message to the app - stop = send_fake_message(clientmock, reqmock, am) - - # send a response - resp = hello(csp.unroll(am.subscribe())) - am.publish(resp) - - # do a react - rct = react(csp.unroll(am.subscribe())) - am.publish(rct) - - csp.add_graph_output("response", resp) - csp.add_graph_output("react", rct) - - # stop after first messages - done_flag = (csp.count(stop) + csp.count(resp) + csp.count(rct)) == 3 - csp.stop_engine(stop) - - # run the graph - resp = csp.run(graph, realtime=True) - - # check outputs - if payload == PUBLIC_CHANNEL_MENTION_PAYLOAD: - assert resp["react"] - assert resp["response"] - - assert resp["react"][0][1] == SlackMessage( - channel="a private channel", channel_id="EFGH", reaction="eyes", thread="1.2" - ) - assert resp["response"][0][1] == SlackMessage( - channel="a new channel", msg="Hello <@USERID>!", thread="1.2" - ) - else: - assert resp["react"] - assert resp["response"] - - assert resp["react"][0][1] == SlackMessage( - channel="a private channel", channel_id="EFGH", reaction="eyes", thread="2.1" - ) - assert resp["response"][0][1] == SlackMessage( - channel="a new channel", msg="Hello <@USERID>!", thread="2.1" - ) - - # check all inbound mocks got called - if payload == PUBLIC_CHANNEL_MENTION_PAYLOAD: - assert clientmock.return_value.web_client.users_info.call_count == 2 - else: - assert clientmock.return_value.web_client.users_info.call_count == 1 - assert clientmock.return_value.web_client.conversations_info.call_count == 1 - - # check all outbound mocks got called - assert clientmock.return_value.web_client.reactions_add.call_count == 1 - assert clientmock.return_value.web_client.chat_postMessage.call_count == 1 - - if payload == PUBLIC_CHANNEL_MENTION_PAYLOAD: - assert clientmock.return_value.web_client.reactions_add.call_args_list == [ - call(channel="EFGH", name="eyes", timestamp="1.2") - ] - assert clientmock.return_value.web_client.chat_postMessage.call_args_list == [ - call(channel="new_channel", text="Hello <@USERID>!") - ] - else: - assert clientmock.return_value.web_client.reactions_add.call_args_list == [ - call(channel="EFGH", name="eyes", timestamp="2.1") - ] - assert clientmock.return_value.web_client.chat_postMessage.call_args_list == [ - call(channel="new_channel", text="Hello <@USERID>!") - ] - - def test_mention_user(self): - assert mention_user("ABCD") == "<@ABCD>" diff --git a/docs/wiki/api-references/Input-Output-Adapters-API.md b/docs/wiki/api-references/Input-Output-Adapters-API.md index 7ed5976e7..edecae372 100644 --- a/docs/wiki/api-references/Input-Output-Adapters-API.md +++ b/docs/wiki/api-references/Input-Output-Adapters-API.md @@ -15,7 +15,6 @@ - [Publishing](#publishing) - [DBReader](#dbreader) - [TimeAccessor](#timeaccessor) -- [Slack](#slack) ## Kafka @@ -349,7 +348,3 @@ Both of these calls expect `typ` to be a `csp.Struct` type. `subscribe` is used to subscribe to a stream for the given symbol (symbol_column is required when creating DBReader) `subscribe_all` is used to retrieve all the data resulting from the request as a single timeseries. - -## Slack - -The Slack adapter allows for reading and writing of messages from the [Slack](https://slack.com) message platform using the [Slack Python SDK](https://slack.dev/python-slack-sdk/). diff --git a/pyproject.toml b/pyproject.toml index 511405e8e..fb02c2fb5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -79,7 +79,6 @@ develop = [ "httpx>=0.20,<1", # kafka "polars", # parquet "psutil", # test_engine/test_history - "slack-sdk>=3", # slack "sqlalchemy", # db "threadpoolctl", # test_random "tornado", # profiler, perspective, websocket @@ -108,7 +107,7 @@ symphony = [ "csp-adapter-symphony", ] slack = [ - "slack-sdk>=3", + "csp-adapter-slack", ] [tool.check-manifest]