-
Notifications
You must be signed in to change notification settings - Fork 1
/
ACEBot.py
141 lines (139 loc) · 5.91 KB
/
ACEBot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
import os
from slackclient import SlackClient
import re
import pprint
import sys
import spotipy
import spotipy.util as util
import html
class acebot:
def __init__(self):
self.username = 'Alexmooo'
self.scope = 'playlist-modify-public'
self.spotify_token = util.prompt_for_user_token(
self.username, self.scope)
self.sp = spotipy.Spotify(self.spotify_token)
slack_token = os.environ["SLACK_API_TOKEN"]
self.sc = SlackClient(slack_token)
self.channelList = self.apiCall("conversations.list",
exclude_archived=1)
self.replies = {}
self.history = {}
self.channelIDs = {}
self.aceTunesURI = set()
def apiCall(self, method, **kwargs):
response = self.sc.api_call(method, **kwargs)
if not response['ok']:
print(response['error'])
return response
def updateChannelList(self):
self.channelList = self.apiCall("conversations.list",
exclude_archived=1)
for channel in self.channelList['channels']:
self.channelIDs[channel['name']] = channel['id']
return None
def getChannelID(self, name):
if name not in self.channelIDs:
self.updateChannelList()
if name in self.channelIDs:
return self.channelIDs[name]
else:
return None
def refreshConversationHistory(self, CID):
self.history[CID] = self.apiCall("conversations.history",
channel=CID)
# if response['has_more']:
def getConversationHistory(self, CID):
if CID not in self.history:
self.refreshConversationHistory(CID)
return self.history[CID]
def hasReply(self, message):
return 'reply_count' in message
def refreshReply(self, message, CID):
ts = message['ts']
if CID not in self.replies:
self.replies[CID] = {}
self.replies[CID][ts] = self.apiCall("conversations.replies",
channel=CID, ts=ts)
def getReply(self, message, CID):
ts = message['ts']
if CID not in self.replies:
self.refreshReply(message, CID)
elif ts not in self.replies[CID]:
self.refreshReply(message, CID)
return self.replies[CID][ts]
def getReplies(self, message, CID):
for reply in self.getReply(message, CID):
yield reply
def youtubeSearch(self, url):
uurl = html.unescape(url)
search = [i for i in uurl.split() if len(i) > 1]
result = self.sp.search(' '.join(search), limit=1, market='US')
if result['tracks']['total'] > 0:
return result['tracks']['items'][0]['id']
for n in range(1, len(search)):
result = self.sp.search(' '.join(search[:-n]), limit=1, market='US')
if result['tracks']['total'] > 0:
return result['tracks']['items'][0]['id']
return None
def getURI(self, url):
if 'spotify' in url[0]:
return re.search('(?<=track\/)(.+?)(?=\?)', url[0]).group()
if 'youtu' in url[0]:
return self.youtubeSearch(url[1])
return None
def getAttachmentLinks(self, message):
if 'attachments' in message:
for attachment in message['attachments']:
yield [attachment['original_url'], attachment['title']]
def gatherReplyHistory(self, history, CID):
for message in history[CID]['messages']:
if self.hasReply(message):
self.getReply(message, CID)
def iterateFullHistory(self, CID):
self.getConversationHistory(CID)
for message in self.history[CID]['messages']:
if self.hasReply(message):
yield self.getReplies(message, CID)
else:
yield message
def iterateAttachmentLinks(self, CID):
for message in self.iterateFullHistory(CID):
for url in self.getAttachmentLinks(message):
yield url
def scrapeACETunes(self):
CID = self.getChannelID('ace-tunes')
for url in self.iterateAttachmentLinks(CID):
uri = self.getURI(url)
if uri:
self.aceTunesURI.add(uri)
else:
print("Not Matched: " + url[0] + " Title: " + url[1])
self.addSongToPlaylist("alexmooo","23FVTo7hhO05iNAqlVxGGu",self.aceTunesURI)
def getEmojiRanking(self, channelName):
CID = self.getChannelID(channelName)
emoji = {}
for message in self.iterateFullHistory(CID):
if 'reactions' in message:
for react in message['reactions']:
if not react['name'] in emoji:
emoji[react['name']] = 0
emoji[react['name']] += react['count']
emoji = list(emoji.items())
return(sorted(emoji, key=lambda x: x[1]))
def checkAndRemoveDuplicateSong(self, username, playlist_id, track_id, scope='playlist-modify-public'):
tracks_in_playlist = self.sp.user_playlist_tracks(username, playlist_id, limit=None)['items']
for tracks in tracks_in_playlist:
if track_id[0] in tracks['track']['uri']:
self.sp.trace = False
results = self.sp.user_playlist_remove_all_occurrences_of_tracks(username, playlist_id, track_id)
return results
def addSongToPlaylist(self, username, playlist_id, track_id=list, scope='playlist-modify-public'):
# all inputs must be entered as a string
# username should be the id not the uri
self.checkAndRemoveDuplicateSong(username, playlist_id, track_id)
self.sp.trace = False
results = self.sp.user_playlist_add_tracks(user=username, playlist_id=playlist_id, tracks=track_id)
# print results
# TODO: nice documentation for each function :^)
# TODO: spotify token refresh?