Skip to content

Commit

Permalink
added ignore error kwarg callable to bypass some errors
Browse files Browse the repository at this point in the history
  • Loading branch information
grantbuster committed Oct 17, 2023
1 parent acf9380 commit 84f0dc3
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 22 deletions.
41 changes: 36 additions & 5 deletions elm/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ async def call_api(url, headers, request_json):
return out

async def call_api_async(self, url, headers, all_request_jsons,
rate_limit=40e3):
ignore_error=None, rate_limit=40e3):
"""Use GPT to clean raw pdf text in parallel calls to the OpenAI API.
NOTE: you need to call this using the await command in ipython or
Expand All @@ -119,6 +119,10 @@ async def call_api_async(self, url, headers, all_request_jsons,
"messages": [{"role": "system", "content": "You do this..."},
{"role": "user", "content": "Do this: {}"}],
"temperature": 0.0}
ignore_error : None | callable
Optional callable to parse API error string. If the callable
returns True, the error will be ignored, the API call will not be
tried again, and the output will be an empty string.
rate_limit : float
OpenAI API rate limit (tokens / minute). Note that the
gpt-3.5-turbo limit is 90k as of 4/2023, but we're using a large
Expand All @@ -132,6 +136,7 @@ async def call_api_async(self, url, headers, all_request_jsons,
corresponding message in the all_request_jsons input.
"""
self.api_queue = ApiQueue(url, headers, all_request_jsons,
ignore_error=ignore_error,
rate_limit=rate_limit)
out = await self.api_queue.run()
return out
Expand Down Expand Up @@ -207,7 +212,8 @@ def generic_query(self, query, model_role=None, temperature=0):
return response

async def generic_async_query(self, queries, model_role=None,
temperature=0, rate_limit=40e3):
temperature=0, ignore_error=None,
rate_limit=40e3):
"""Run a number of generic single queries asynchronously
(not conversational)
Expand All @@ -225,6 +231,10 @@ async def generic_async_query(self, queries, model_role=None,
GPT model temperature, a measure of response entropy from 0 to 1. 0
is more reliable and nearly deterministic; 1 will give the model
more creative freedom and may not return as factual of results.
ignore_error : None | callable
Optional callable to parse API error string. If the callable
returns True, the error will be ignored, the API call will not be
tried again, and the output will be an empty string.
rate_limit : float
OpenAI API rate limit (tokens / minute). Note that the
gpt-3.5-turbo limit is 90k as of 4/2023, but we're using a large
Expand All @@ -247,6 +257,7 @@ async def generic_async_query(self, queries, model_role=None,
all_request_jsons.append(req)

self.api_queue = ApiQueue(self.URL, self.HEADERS, all_request_jsons,
ignore_error=ignore_error,
rate_limit=rate_limit)
out = await self.api_queue.run()

Expand Down Expand Up @@ -324,7 +335,8 @@ def count_tokens(text, model):
class ApiQueue:
"""Class to manage the parallel API queue and submission"""

def __init__(self, url, headers, request_jsons, rate_limit=40e3):
def __init__(self, url, headers, request_jsons, ignore_error=None,
rate_limit=40e3):
"""
Parameters
----------
Expand All @@ -343,6 +355,10 @@ def __init__(self, url, headers, request_jsons, rate_limit=40e3):
"messages": [{"role": "system", "content": "You do this..."},
{"role": "user", "content": "Do this: {}"}],
"temperature": 0.0}
ignore_error : None | callable
Optional callable to parse API error string. If the callable
returns True, the error will be ignored, the API call will not be
tried again, and the output will be an empty string.
rate_limit : float
OpenAI API rate limit (tokens / minute). Note that the
gpt-3.5-turbo limit is 90k as of 4/2023, but we're using a large
Expand All @@ -353,11 +369,13 @@ def __init__(self, url, headers, request_jsons, rate_limit=40e3):
self.url = url
self.headers = headers
self.request_jsons = request_jsons
self.ignore_error = ignore_error
self.rate_limit = rate_limit

self.api_jobs = {}
self.todo = [True] * len(self)
self.out = [None] * len(self)
self.errors = [None] * len(self)

def __len__(self):
"""Number of API calls to submit"""
Expand Down Expand Up @@ -401,8 +419,21 @@ async def collect_jobs(self):
task_out = await self.api_jobs[i]

if 'error' in task_out:
logger.error('Received API error for task #{}: {}'
.format(i + 1, task_out))
msg = ('Received API error for task #{0} '
'(see `ApiQueue.errors[{1}]` and '
'`ApiQueue.request_jsons[{1}]` for more details). '
'Error message: {2}'.format(i + 1, i, task_out))
self.errors[i] = 'Error: {}'.format(task_out)
if (self.ignore_error is not None
and self.ignore_error(str(task_out))):
msg += ' Ignoring error and moving on.'
dummy = {'choices': [{'message': {'content': ''}}]}
self.out[i] = dummy
self.todo[i] = False
complete = len(self) - sum(self.todo)
else:
msg += ' Retrying query.'
logger.error(msg)
else:
self.out[i] = task_out
self.todo[i] = False
Expand Down
37 changes: 22 additions & 15 deletions elm/pdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,18 @@
class PDFtoTXT(ApiBase):
"""Class to parse text from a PDF document."""

MODEL_ROLE = ('You clean up poorly formatted text '
'extracted from PDF documents.')
"""High level model role."""

MODEL_INSTRUCTION = ('Text extracted from a PDF: '
'\n"""\n{}\n"""\n\n'
'The text above was extracted from a PDF document. '
'Can you make it nicely formatted? '
'Please only return the formatted text '
'without comments or added information.')
"""Instructions to the model with python format braces for pdf text"""

def __init__(self, fp, page_range=None, model=None):
"""
Parameters
Expand Down Expand Up @@ -68,14 +80,11 @@ def load_pdf(self, page_range):
.format(i + 1 + page_range.start, len(pdf.pages)))
else:
out.append(page_text)
logger.debug('Loaded page {} out of {}'
.format(i + 1 + page_range.start, len(pdf.pages)))

logger.info('Finished loading PDF.')
return out

@staticmethod
def make_gpt_messages(pdf_raw_text):
def make_gpt_messages(self, pdf_raw_text):
"""Make the chat completion messages list for input to GPT
Parameters
Expand All @@ -91,16 +100,9 @@ def make_gpt_messages(pdf_raw_text):
[{"role": "system", "content": "You do this..."},
{"role": "user", "content": "Please do this: {}"}]
"""
query = ('Text extracted from a PDF: '
'\"\"\"\n{}\"\"\"\n\n'
'The text above was extracted from a PDF document. '
'Can you make it nicely formatted? '
'Please only return the formatted text, nothing else.'
.format(pdf_raw_text))

role_str = ('You clean up poorly formatted text '
'extracted from PDF documents.')
messages = [{"role": "system", "content": role_str},

query = self.MODEL_INSTRUCTION.format(pdf_raw_text)
messages = [{"role": "system", "content": self.MODEL_ROLE},
{"role": "user", "content": query}]

return messages
Expand Down Expand Up @@ -147,14 +149,18 @@ def clean_txt(self):

return clean_pages

async def clean_txt_async(self, rate_limit=40e3):
async def clean_txt_async(self, ignore_error=None, rate_limit=40e3):
"""Use GPT to clean raw pdf text in parallel calls to the OpenAI API.
NOTE: you need to call this using the await command in ipython or
jupyter, e.g.: `out = await PDFtoTXT.clean_txt_async()`
Parameters
----------
ignore_error : None | callable
Optional callable to parse API error string. If the callable
returns True, the error will be ignored, the API call will not be
tried again, and the output will be an empty string.
rate_limit : float
OpenAI API rate limit (tokens / minute). Note that the
gpt-3.5-turbo limit is 90k as of 4/2023, but we're using a large
Expand All @@ -178,6 +184,7 @@ async def clean_txt_async(self, rate_limit=40e3):

clean_pages = await self.call_api_async(self.URL, self.HEADERS,
all_request_jsons,
ignore_error=ignore_error,
rate_limit=rate_limit)

for i, page in enumerate(clean_pages):
Expand Down
13 changes: 11 additions & 2 deletions elm/summary.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ def run(self, temperature=0, fancy_combine=True):
Summary of text.
"""

logger.info('Summarizing {} text chunks in serial...'
.format(len(self.text_chunks)))
summary = ''

for i, chunk in enumerate(self.text_chunks):
Expand All @@ -115,10 +117,12 @@ def run(self, temperature=0, fancy_combine=True):
if fancy_combine:
summary = self.combine(summary)

logger.info('Finished all summaries.')

return summary

async def run_async(self, temperature=0, rate_limit=40e3,
fancy_combine=True):
async def run_async(self, temperature=0, ignore_error=None,
rate_limit=40e3, fancy_combine=True):
"""Run text summary asynchronously for all text chunks
NOTE: you need to call this using the await command in ipython or
Expand All @@ -130,6 +134,10 @@ async def run_async(self, temperature=0, rate_limit=40e3,
GPT model temperature, a measure of response entropy from 0 to 1. 0
is more reliable and nearly deterministic; 1 will give the model
more creative freedom and may not return as factual of results.
ignore_error : None | callable
Optional callable to parse API error string. If the callable
returns True, the error will be ignored, the API call will not be
tried again, and the output will be an empty string.
rate_limit : float
OpenAI API rate limit (tokens / minute). Note that the
gpt-3.5-turbo limit is 90k as of 4/2023, but we're using a large
Expand Down Expand Up @@ -157,6 +165,7 @@ async def run_async(self, temperature=0, rate_limit=40e3,
summaries = await self.generic_async_query(queries,
model_role=self.MODEL_ROLE,
temperature=temperature,
ignore_error=ignore_error,
rate_limit=rate_limit)

self.summary_chunks = summaries
Expand Down

0 comments on commit 84f0dc3

Please sign in to comment.