Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gb/api errors #3

Merged
merged 6 commits into from
Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 85 additions & 11 deletions elm/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
ELM abstract class for API calls
"""
from abc import ABC
import numpy as np
import asyncio
import aiohttp
import openai
Expand Down Expand Up @@ -48,8 +49,27 @@ def __init__(self, model=None):
cls.DEFAULT_MODEL
"""
self.model = model or self.DEFAULT_MODEL
self.chat_messages = [{"role": "system", "content": self.MODEL_ROLE}]
self.api_queue = None
self.messages = []
self.clear()

@property
def all_messages_txt(self):
"""Get a string printout of the full conversation with the LLM

Returns
-------
str
"""
messages = [f"{msg['role'].upper()}: {msg['content']}"
for msg in self.messages]
messages = '\n\n'.join(messages)
return messages

def clear(self):
"""Clear chat history and reduce messages to just the initial model
role message."""
self.messages = [{"role": "system", "content": self.MODEL_ROLE}]

@staticmethod
async def call_api(url, headers, request_json):
Expand Down Expand Up @@ -96,7 +116,7 @@ async def call_api(url, headers, request_json):
return out

async def call_api_async(self, url, headers, all_request_jsons,
rate_limit=40e3):
ignore_error=None, rate_limit=40e3):
"""Use GPT to clean raw pdf text in parallel calls to the OpenAI API.

NOTE: you need to call this using the await command in ipython or
Expand All @@ -119,6 +139,10 @@ async def call_api_async(self, url, headers, all_request_jsons,
"messages": [{"role": "system", "content": "You do this..."},
{"role": "user", "content": "Do this: {}"}],
"temperature": 0.0}
ignore_error : None | callable
Optional callable to parse API error string. If the callable
returns True, the error will be ignored, the API call will not be
tried again, and the output will be an empty string.
rate_limit : float
OpenAI API rate limit (tokens / minute). Note that the
gpt-3.5-turbo limit is 90k as of 4/2023, but we're using a large
Expand All @@ -132,6 +156,7 @@ async def call_api_async(self, url, headers, all_request_jsons,
corresponding message in the all_request_jsons input.
"""
self.api_queue = ApiQueue(url, headers, all_request_jsons,
ignore_error=ignore_error,
rate_limit=rate_limit)
out = await self.api_queue.run()
return out
Expand All @@ -155,18 +180,18 @@ def chat(self, query, temperature=0):
Model response
"""

self.chat_messages.append({"role": "user", "content": query})
self.messages.append({"role": "user", "content": query})

kwargs = dict(model=self.model,
messages=self.chat_messages,
messages=self.messages,
temperature=temperature,
stream=False)
if 'azure' in str(openai.api_type).lower():
kwargs['engine'] = self.model

response = openai.ChatCompletion.create(**kwargs)
response = response["choices"][0]["message"]["content"]
self.chat_messages.append({'role': 'assistant', 'content': response})
self.messages.append({'role': 'assistant', 'content': response})

return response

Expand Down Expand Up @@ -207,7 +232,8 @@ def generic_query(self, query, model_role=None, temperature=0):
return response

async def generic_async_query(self, queries, model_role=None,
temperature=0, rate_limit=40e3):
temperature=0, ignore_error=None,
rate_limit=40e3):
"""Run a number of generic single queries asynchronously
(not conversational)

Expand All @@ -225,6 +251,10 @@ async def generic_async_query(self, queries, model_role=None,
GPT model temperature, a measure of response entropy from 0 to 1. 0
is more reliable and nearly deterministic; 1 will give the model
more creative freedom and may not return as factual of results.
ignore_error : None | callable
Optional callable to parse API error string. If the callable
returns True, the error will be ignored, the API call will not be
tried again, and the output will be an empty string.
rate_limit : float
OpenAI API rate limit (tokens / minute). Note that the
gpt-3.5-turbo limit is 90k as of 4/2023, but we're using a large
Expand All @@ -247,6 +277,7 @@ async def generic_async_query(self, queries, model_role=None,
all_request_jsons.append(req)

self.api_queue = ApiQueue(self.URL, self.HEADERS, all_request_jsons,
ignore_error=ignore_error,
rate_limit=rate_limit)
out = await self.api_queue.run()

Expand Down Expand Up @@ -324,7 +355,8 @@ def count_tokens(text, model):
class ApiQueue:
"""Class to manage the parallel API queue and submission"""

def __init__(self, url, headers, request_jsons, rate_limit=40e3):
def __init__(self, url, headers, request_jsons, ignore_error=None,
rate_limit=40e3, max_retries=5):
"""
Parameters
----------
Expand All @@ -343,21 +375,32 @@ def __init__(self, url, headers, request_jsons, rate_limit=40e3):
"messages": [{"role": "system", "content": "You do this..."},
{"role": "user", "content": "Do this: {}"}],
"temperature": 0.0}
ignore_error : None | callable
Optional callable to parse API error string. If the callable
returns True, the error will be ignored, the API call will not be
tried again, and the output will be an empty string.
rate_limit : float
OpenAI API rate limit (tokens / minute). Note that the
gpt-3.5-turbo limit is 90k as of 4/2023, but we're using a large
factor of safety (~1/2) because we can only count the tokens on the
input side and assume the output is about the same count.
max_retries : int
Number of times to retry an API call with an error response before
raising an error.
"""

self.url = url
self.headers = headers
self.request_jsons = request_jsons
self.ignore_error = ignore_error
self.rate_limit = rate_limit
self.max_retries = max_retries

self.api_jobs = {}
self.todo = [True] * len(self)
self.out = [None] * len(self)
self.errors = [None] * len(self)
self.tries = np.zeros(len(self))

def __len__(self):
"""Number of API calls to submit"""
Expand All @@ -380,12 +423,14 @@ def submit_jobs(self):
self.headers,
request))
self.api_jobs[i] = task
self.tries[i] += 1

logger.debug('Submitted {} out of {}, '
'token count is at {} '
'(rate limit is {})'
'(rate limit is {}). '
'Max attempts for a job is {}'
.format(i + 1, len(self), token_count,
self.rate_limit))
self.rate_limit, int(self.tries.max())))

elif token_count >= self.rate_limit:
token_count = 0
Expand All @@ -401,8 +446,24 @@ async def collect_jobs(self):
task_out = await self.api_jobs[i]

if 'error' in task_out:
logger.error('Received API error for task #{}: {}'
.format(i + 1, task_out))
msg = ('Received API error for task #{0} '
'(see `ApiQueue.errors[{1}]` and '
'`ApiQueue.request_jsons[{1}]` for more details). '
'Error message: {2}'.format(i + 1, i, task_out))
self.errors[i] = 'Error: {}'.format(task_out)

if (self.ignore_error is not None
and self.ignore_error(str(task_out))):
msg += ' Ignoring error and moving on.'
dummy = {'choices': [{'message': {'content': ''}}]}
self.out[i] = dummy
self.todo[i] = False
complete = len(self) - sum(self.todo)
else:
msg += ' Retrying query.'

logger.error(msg)

else:
self.out[i] = task_out
self.todo[i] = False
Expand All @@ -423,8 +484,21 @@ async def run(self):

logger.debug('Submitting async API calls...')

self.api_jobs = {}
self.todo = [True] * len(self)
self.out = [None] * len(self)
self.errors = [None] * len(self)
self.tries = np.zeros(len(self))

while any(self.todo):
self.submit_jobs()
await self.collect_jobs()

if any(self.tries > self.max_retries):
msg = (f'Hit {self.max_retries} retries on API queries. '
'Stopping. See `ApiQueue.errors` for more '
'details on error response')
logger.error(msg)
raise RuntimeError(msg)

return self.out
41 changes: 24 additions & 17 deletions elm/pdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,18 @@
class PDFtoTXT(ApiBase):
"""Class to parse text from a PDF document."""

MODEL_ROLE = ('You clean up poorly formatted text '
'extracted from PDF documents.')
"""High level model role."""

MODEL_INSTRUCTION = ('Text extracted from a PDF: '
'\n"""\n{}\n"""\n\n'
'The text above was extracted from a PDF document. '
'Can you make it nicely formatted? '
'Please only return the formatted text '
'without comments or added information.')
"""Instructions to the model with python format braces for pdf text"""

def __init__(self, fp, page_range=None, model=None):
"""
Parameters
Expand Down Expand Up @@ -68,14 +80,11 @@ def load_pdf(self, page_range):
.format(i + 1 + page_range.start, len(pdf.pages)))
else:
out.append(page_text)
logger.debug('Loaded page {} out of {}'
.format(i + 1 + page_range.start, len(pdf.pages)))

logger.info('Finished loading PDF.')
return out

@staticmethod
def make_gpt_messages(pdf_raw_text):
def make_gpt_messages(self, pdf_raw_text):
"""Make the chat completion messages list for input to GPT

Parameters
Expand All @@ -91,16 +100,9 @@ def make_gpt_messages(pdf_raw_text):
[{"role": "system", "content": "You do this..."},
{"role": "user", "content": "Please do this: {}"}]
"""
query = ('Text extracted from a PDF: '
'\"\"\"\n{}\"\"\"\n\n'
'The text above was extracted from a PDF document. '
'Can you make it nicely formatted? '
'Please only return the formatted text, nothing else.'
.format(pdf_raw_text))

role_str = ('You clean up poorly formatted text '
'extracted from PDF documents.')
messages = [{"role": "system", "content": role_str},

query = self.MODEL_INSTRUCTION.format(pdf_raw_text)
messages = [{"role": "system", "content": self.MODEL_ROLE},
{"role": "user", "content": query}]

return messages
Expand Down Expand Up @@ -147,14 +149,18 @@ def clean_txt(self):

return clean_pages

async def clean_txt_async(self, rate_limit=40e3):
async def clean_txt_async(self, ignore_error=None, rate_limit=40e3):
"""Use GPT to clean raw pdf text in parallel calls to the OpenAI API.

NOTE: you need to call this using the await command in ipython or
jupyter, e.g.: `out = await PDFtoTXT.clean_txt_async()`

Parameters
----------
ignore_error : None | callable
Optional callable to parse API error string. If the callable
returns True, the error will be ignored, the API call will not be
tried again, and the output will be an empty string.
rate_limit : float
OpenAI API rate limit (tokens / minute). Note that the
gpt-3.5-turbo limit is 90k as of 4/2023, but we're using a large
Expand All @@ -178,6 +184,7 @@ async def clean_txt_async(self, rate_limit=40e3):

clean_pages = await self.call_api_async(self.URL, self.HEADERS,
all_request_jsons,
ignore_error=ignore_error,
rate_limit=rate_limit)

for i, page in enumerate(clean_pages):
Expand Down Expand Up @@ -212,8 +219,8 @@ def replace_chars_for_clean(text):
raw_words = replace_chars_for_clean(raw).split(' ')
clean_words = replace_chars_for_clean(clean).split(' ')

raw_words = set([x for x in raw_words if len(x) > 2])
clean_words = set([x for x in clean_words if len(x) > 2])
raw_words = {x for x in raw_words if len(x) > 2}
clean_words = {x for x in clean_words if len(x) > 2}

isin = sum(x in clean_words for x in raw_words)

Expand Down
13 changes: 11 additions & 2 deletions elm/summary.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ def run(self, temperature=0, fancy_combine=True):
Summary of text.
"""

logger.info('Summarizing {} text chunks in serial...'
.format(len(self.text_chunks)))
summary = ''

for i, chunk in enumerate(self.text_chunks):
Expand All @@ -115,10 +117,12 @@ def run(self, temperature=0, fancy_combine=True):
if fancy_combine:
summary = self.combine(summary)

logger.info('Finished all summaries.')

return summary

async def run_async(self, temperature=0, rate_limit=40e3,
fancy_combine=True):
async def run_async(self, temperature=0, ignore_error=None,
rate_limit=40e3, fancy_combine=True):
"""Run text summary asynchronously for all text chunks

NOTE: you need to call this using the await command in ipython or
Expand All @@ -130,6 +134,10 @@ async def run_async(self, temperature=0, rate_limit=40e3,
GPT model temperature, a measure of response entropy from 0 to 1. 0
is more reliable and nearly deterministic; 1 will give the model
more creative freedom and may not return as factual of results.
ignore_error : None | callable
Optional callable to parse API error string. If the callable
returns True, the error will be ignored, the API call will not be
tried again, and the output will be an empty string.
rate_limit : float
OpenAI API rate limit (tokens / minute). Note that the
gpt-3.5-turbo limit is 90k as of 4/2023, but we're using a large
Expand Down Expand Up @@ -157,6 +165,7 @@ async def run_async(self, temperature=0, rate_limit=40e3,
summaries = await self.generic_async_query(queries,
model_role=self.MODEL_ROLE,
temperature=temperature,
ignore_error=ignore_error,
rate_limit=rate_limit)

self.summary_chunks = summaries
Expand Down
7 changes: 2 additions & 5 deletions elm/tree.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def messages(self):
-------
list
"""
return self.api.chat_messages
return self.api.messages

@property
def all_messages_txt(self):
Expand All @@ -90,10 +90,7 @@ def all_messages_txt(self):
-------
str
"""
messages = [f"{msg['role'].upper()}: {msg['content']}"
for msg in self.messages]
messages = '\n\n'.join(messages)
return messages
return self.api.all_messages_txt

@property
def history(self):
Expand Down
Loading
Loading