-
Notifications
You must be signed in to change notification settings - Fork 1
/
onsite_helper.py
131 lines (121 loc) · 4.45 KB
/
onsite_helper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import os
import json
import boto3
import logging
import requests
from openai import OpenAI
from datetime import datetime
import azure.functions as func
from twilio.rest import Client
#------------------------------------#
# Load environment variables
#------------------------------------#
ACCOUNT_SID = os.environ["ACCOUNT_SID"]
AUTH_TOKEN = os.environ["AUTH_TOKEN"]
SEC_PIN = os.environ["SECURITY_PIN"]
#------------------------------------#
# Twilio Client
#------------------------------------#
CLIENT = Client(ACCOUNT_SID, AUTH_TOKEN)
#------------------------------------#
# Initialize SQS client
#------------------------------------#
sqs = boto3.client('sqs')
queue_url = os.environ["AMAZON_QUEUE_ENDPOINT"]
def save_sms_to_sqs(sender, message, date_sent=None):
"""
Save SMS message details into SQS queue.
"""
try:
message_body = {
'sender': sender,
'message': message,
'date_sent': date_sent if date_sent else datetime.utcnow().isoformat()
}
response = sqs.send_message(
QueueUrl=queue_url,
MessageBody=json.dumps(message_body)
)
return f"Message from {sender} saved successfully."
except Exception as e:
logging.error(f"Error saving message to SQS: {e}")
return f"Error saving message. {e}"
#------------------------------------#
# Security check
#------------------------------------#
def process_incoming_message(PIN, incoming_message, send_to, send_from, image_urls, audio_urls):
"""
Generate a reply based on the incoming message.
Parameters
----------
PIN : str
Security PIN
incoming_message : str
Incoming message from Twilio
send_to : str
Twilio number
send_from : str
Who was this request send from & person to whom we are replying
image_urls : list
A list of twilio media urls for images, to be processed using vision AI
audio_urls : list
A list of audio media urls for audio, to be transcribed & processed as text on-site
Returns
-------
message : str
Reply message
"""
if incoming_message.strip() == PIN:
return """Welcome to the new internal Hess Services SMS AI service.
- I can lookup general information about jobs, parts, parts where used, inventory on hand, serial numbers, etc.
- I can also email you.
- Text 'about' to see this message again"""
else:
messages = CLIENT.messages.list(from_=send_to, to=send_from)
sent_pin = False
for message in messages:
if message.body.strip() == PIN:
sent_pin = True
if sent_pin:
follow_up_reply = get_follow_up_text(send_to=send_to,
send_from=send_from,
incoming_message=incoming_message,
image_urls=image_urls,
audio_urls=audio_urls)
return follow_up_reply
else:
return f"Please provide security PIN to continue."
#------------------------------------#
# Follow up text
#------------------------------------#
def get_follow_up_text(send_to, send_from, incoming_message, image_urls, audio_urls):
"""Send follow up text
Parameters
----------
send_to : str
Phone number to send text to
send_from : str
Phone number to send text from
incoming_message : str
Incoming message from Twilio
image_urls : list
list of image urls
audio_urls : list
list of audio urls
Returns
-------
message : str
Response from the AI to the user
"""
if incoming_message == 'about':
return """Welcome to the new internal Hess Services SMS AI service.
- I can lookup general information about jobs, parts, parts where used, inventory on hand, serial numbers, etc.
- I can also email you.
- Text 'about' to see this message again"""
else:
# Message to save is then processed on site & contents of lists
# are extracted using regex (extracting contents of first set of [] and second set of []
message_to_save = f"{image_urls} {audio_urls} {incoming_message}"
result = save_sms_to_sqs(send_to, message_to_save)
# Receive incoming messages without sending a reply
return ''