From 55f4c5ee4da2f7052e3905e836e82deeca4f2dcd Mon Sep 17 00:00:00 2001 From: Mark <130257905+kramcat@users.noreply.github.com> Date: Tue, 16 May 2023 23:23:58 +0300 Subject: [PATCH] new v0.4.0 --- characterai/__init__.py | 3 + characterai/characterai.py | 160 +++++++++++++++++++++++++++++++++++++ characterai/errors.py | 8 ++ characterai/pyasynccai.py | 159 ++++++++++++++++++++++++++++++++++++ 4 files changed, 330 insertions(+) create mode 100644 characterai/__init__.py create mode 100644 characterai/characterai.py create mode 100644 characterai/errors.py create mode 100644 characterai/pyasynccai.py diff --git a/characterai/__init__.py b/characterai/__init__.py new file mode 100644 index 0000000..5ec1744 --- /dev/null +++ b/characterai/__init__.py @@ -0,0 +1,3 @@ +from characterai.characterai import pyCAI, pyAsyncCAI + +__all__ = ['pyCAI', 'pyAsyncCAI'] \ No newline at end of file diff --git a/characterai/characterai.py b/characterai/characterai.py new file mode 100644 index 0000000..cd066e9 --- /dev/null +++ b/characterai/characterai.py @@ -0,0 +1,160 @@ +from typing import Dict +import json + +from playwright.sync_api import sync_playwright + +from characterai import errors +from characterai.pyasynccai import pyAsyncCAI + +__all__ = ['pyCAI', 'pyAsyncCAI'] + +page = None + +def goto(link: str, *, wait: bool = False, token: str = None): + if token != None: + page.set_extra_http_headers({"Authorization": f"Token {token}"}) + + page.goto(link) + + if page.title() != 'Waiting Room powered by Cloudflare': + if page.get_by_role("button", name="Accept").is_visible(): + page.get_by_role("button", name="Accept").click() + + return page + + else: + if wait: + page.wait_for_selector('div#wrapper', state='detached', timeout=0) + goto(link=link, wait=wait) + else: + raise errors.NoResponse('The Site is Overloaded') + +def GetResponse(link: str, *, wait: bool = False, token: str = None) -> Dict[str, str]: + goto(f'https://beta.character.ai/{link}/', wait=wait, token=token) + data = json.loads(page.locator('body').inner_text()) + + return data + + +class pyCAI: + def __init__(self, token: str, *, headless: bool = True): + global page + + self.token = token + self.headless = headless + + self.browser = sync_playwright().start().firefox.launch(headless=headless) + self.context = self.browser.new_context( + extra_http_headers={"Authorization": f"Token {self.token}"} + ) + page = self.context.new_page() + + self.user = self.user() + self.character = self.character() + self.chat = self.chat() + + class user: + """ + Just a Responses from site for user info + + user.info() + """ + + def info(self, *, wait: bool = False, token: str = None) -> Dict[str, str]: + return GetResponse(link='chat/user', wait=wait, token=token) + + def posts(self, *, wait: bool = False, token: str = None) -> Dict[str, str]: + return GetResponse(link='chat/posts/user/?scope=user&page=1&posts_to_load=5', wait=wait, token=token) + + def followers(self, *, wait: bool = False, token: str = None) -> Dict[str, str]: + return GetResponse(link='chat/user/followers', wait=wait, token=token) + + def following(self, *, wait: bool = False, token: str = None) -> Dict[str, str]: + return GetResponse(link='chat/user/following', wait=wait, token=token) + + def recent(self, *, wait: bool = False, token: str = None) -> Dict[str, str]: + return GetResponse('chat/characters/recent/', wait=wait, token=token) + + class character: + """ + Just a Responses from site for characters + + character.trending() + character.info('CHAR') + character.search('SEARCH') + """ + def trending(self, *, wait: bool = False, token: str = None) -> Dict[str, str]: + return GetResponse(link='chat/characters/trending', wait=wait, token=token) + + def recommended(self, *, wait: bool = False, token: str = None) -> Dict[str, str]: + return GetResponse(link='chat/characters/recommended', wait=wait, token=token) + + def categories(self, *, wait: bool = False, token: str = None) -> Dict[str, str]: + return GetResponse(link='chat/character/categories', wait=wait, token=token) + + def info(self, char: str, *, wait: bool = False, token: str = None) -> Dict[str, str]: + data = GetResponse(f'chat/character/info-cached/{char}/', wait=wait, token=token) + + if data != "{'error': 'Server Error (500)'}": + return data + else: + raise errors.CharNotFound('Wrong Char') + + def search(self, search, *, wait: bool = False, token: str = None) -> Dict[str, str]: + return GetResponse(f'chat/characters/search/?query={search}', wait=wait, token=token) + + class chat: + def get_history(self, char: str, *, wait: bool = False, token: str = None) -> Dict[str, str]: + """ + Getting character chat history, return json response + + chat.get_history('CHAR') + """ + goto(f'https://beta.character.ai/chat?char={char}', wait=wait, token=token) + + if page.query_selector('h1') == None: + with page.expect_response(lambda response: response.url.startswith('https://beta.character.ai/chat/history/msgs/user/')) as response_info: + goto(f'https://beta.character.ai/chat?char={char}', wait=wait, token=token) + + response = response_info.value.text() + return json.loads(response) + else: + raise errors.CharNotFound('Wrong Char') + + def send_message(self, char: str, message: str, *, wait: bool = False, token: str = None) -> Dict[str, str]: + """ + Sending a message, return json + + chat.send_message('CHAR', 'MESSAGE') + """ + goto(f'https://beta.character.ai/chat?char={char}', wait=wait, token=token) + + # BIG THANKS - HearYourWaifu + page.evaluate(""" + const { fetch: origFetch } = window; + window.fetch = async (...args) => { + const response = await origFetch(...args); + const raw_text = await new Response(response.clone().body).text(); + return response;};""") + + with page.expect_response("https://beta.character.ai/chat/streaming/") as response_info: + page.get_by_placeholder("Type a message").fill(message) + page.get_by_role("button", name="Submit Message").click() + + response = response_info.value.text() + return json.loads('{"replies": ' + response.split(r'{"replies": ')[-1]) + + def new_chat(self, char: str, *, wait: bool = False, token: str = None) -> None: + """ + Starting new chat, return json + + chat.new_chat('CHAR') + """ + goto(f'https://beta.character.ai/chat?char={char}', wait=wait, token=token) + + with page.expect_response("https://beta.character.ai/chat/history/create/") as response_info: + page.wait_for_selector('.col-auto.px-2.dropdown').click() + page.wait_for_selector('"Save and Start New Chat"').click() + + response = response_info.value.text() + return json.loads(response) \ No newline at end of file diff --git a/characterai/errors.py b/characterai/errors.py new file mode 100644 index 0000000..3f350fc --- /dev/null +++ b/characterai/errors.py @@ -0,0 +1,8 @@ +class pyCAIError(Exception): + pass + +class NoResponse(pyCAIError): + pass + +class CharNotFound(pyCAIError): + pass \ No newline at end of file diff --git a/characterai/pyasynccai.py b/characterai/pyasynccai.py new file mode 100644 index 0000000..f5be25a --- /dev/null +++ b/characterai/pyasynccai.py @@ -0,0 +1,159 @@ +from typing import Dict +import asyncio +import json + +from playwright.async_api import async_playwright + +from characterai import errors + +__all__ = ['pyCAI', 'pyAsyncCAI'] + +page = None + +async def goto(link: str, *, wait: bool = False, token: str = None): + if token != None: + await page.set_extra_http_headers({"Authorization": f"Token {token}"}) + + await page.goto(link) + + if await page.title() != 'Waiting Room powered by Cloudflare': + if await page.get_by_role("button", name="Accept").is_visible(): + await page.get_by_role("button", name="Accept").click() + + return page + + else: + if wait: + await page.wait_for_selector('div#wrapper', state='detached', timeout=0) + await goto(link=link, wait=wait) + else: + raise errors.NoResponse('The Site is Overloaded') + +async def GetResponse(link: str, *, wait: bool = False, token: str = None) -> Dict[str, str]: + await goto(f'https://beta.character.ai/{link}/', wait=wait, token=token) + data = json.loads(await (page.locator('body').inner_text())) + + return data + + +class pyAsyncCAI: + def __init__(self, token: str): + self.token = token + + self.user = self.user() + self.character = self.character() + self.chat = self.chat() + + async def start(self, *, headless: bool = True): + global page + + self.browser = await (await async_playwright().start()).firefox.launch(headless=headless) + self.context = await self.browser.new_context( + extra_http_headers={"Authorization": f"Token {self.token}"} + ) + page = await self.context.new_page() + + class user: + """ + Just a Responses from site for user info + + user.info() + """ + async def info(self, *, wait: bool = False, token: str = None) -> Dict[str, str]: + return await GetResponse('chat/user', wait=wait, token=token) + + async def posts(self, *, wait: bool = False, token: str = None) -> Dict[str, str]: + return await GetResponse('chat/posts/user/?scope=user&page=1&posts_to_load=5', wait=wait, token=token) + + async def followers(self, *, wait: bool = False, token: str = None) -> Dict[str, str]: + return await GetResponse('chat/user/followers', wait=wait, token=token) + + async def following(self, *, wait: bool = False, token: str = None) -> Dict[str, str]: + return await GetResponse('chat/user/following', wait=wait, token=token) + + async def recent(self, *, wait: bool = False, token: str = None) -> Dict[str, str]: + return await GetResponse('chat/characters/recent', wait=wait, token=token) + + class character: + """ + Just a Responses from site for characters + + character.trending() + character.info('CHAR') + character.search('SEARCH') + """ + async def trending(self, *, wait: bool = False, token: str = None) -> Dict[str, str]: + return await GetResponse('chat/characters/trending', wait=wait, token=token) + + async def recommended(self, *, wait: bool = False, token: str = None) -> Dict[str, str]: + return await GetResponse('chat/characters/recommended', wait=wait, token=token) + + async def categories(self, *, wait: bool = False, token: str = None) -> Dict[str, str]: + return await GetResponse('chat/character/categories', wait=wait, token=token) + + async def info(self, char: str, *, wait: bool = False, token: str = None) -> Dict[str, str]: + data = await GetResponse(f'chat/character/info-cached/{char}/', wait=wait, token=token) + + if data != "{'error': 'Server Error (500)'}": + return data + else: + raise errors.CharNotFound('Wrong Char') + + async def search(self, search, *, wait: bool = False, token: str = None) -> Dict[str, str]: + return await GetResponse(f'chat/characters/search/?query={search}', wait=wait, token=token) + + class chat: + async def get_history(self, char: str, *, wait: bool = False, token: str = None) -> Dict[str, str]: + """ + Getting character chat history, return json response + + chat.get_history('CHAR') + """ + await goto(f'https://beta.character.ai/chat?char={char}', wait=wait, token=token) + + if await page.query_selector('h1') == None: + async with page.expect_response(lambda response: response.url.startswith('https://beta.character.ai/chat/history/msgs/user/')) as response_info: + await goto(f'https://beta.character.ai/chat?char={char}', wait=wait, token=token) + + response = await (await response_info.value).text() + return json.loads(response) + else: + raise errors.CharNotFound('Wrong Char') + + async def send_message(self, char: str, message: str, *, wait: bool = False, token: str = None) -> Dict[str, str]: + """ + Sending a message, return json + + chat.send_message('CHAR', 'MESSAGE') + """ + await goto(f'https://beta.character.ai/chat?char={char}', wait=wait, token=token) + + # BIG THANKS - HearYourWaifu + await page.evaluate(""" + const { fetch: origFetch } = window; + window.fetch = async (...args) => { + const response = await origFetch(...args); + const raw_text = await new Response(response.clone().body).text(); + return response;};""") + + async with page.expect_response("https://beta.character.ai/chat/streaming/") as response_info: + await page.get_by_placeholder("Type a message").fill(message) + await page.get_by_role("button", name="Submit Message").click() + + response = await (await response_info.value).text() + return json.loads('{"replies": ' + response.split(r'{"replies": ')[-1]) + + async def new_chat(self, char: str, *, wait: bool = False, token: str = None) -> Dict[str, str]: + """ + Starting new chat, return json + + chat.new_chat('CHAR') + """ + await goto(f'https://beta.character.ai/chat?char={char}', wait=wait, token=token) + + async with page.expect_response("https://beta.character.ai/chat/history/create/") as response_info: + await (await page.wait_for_selector('.col-auto.px-2.dropdown')).click() + await (await page.wait_for_selector('"Save and Start New Chat"')).click() + + response = await (await response_info.value).text() + return json.loads(response) \ No newline at end of file