-
Notifications
You must be signed in to change notification settings - Fork 0
/
tweepy_streamer.py
145 lines (110 loc) · 4.72 KB
/
tweepy_streamer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
from tweepy import API
from tweepy import Cursor
from tweepy.streaming import StreamListener
from tweepy import OAuthHandler
from tweepy import Stream
from textblob import TextBlob
import twitter_credentials
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import re
# # # # TWITTER CLIENT # # # #
class TwitterClient():
def __init__(self, twitter_user=None):
self.auth = TwitterAuthenticator().authenticate_twitter_app()
self.twitter_client = API(self.auth)
self.twitter_user = twitter_user
def get_twitter_client_api(self):
return self.twitter_client
def get_user_timeline_tweets(self, num_tweets):
tweets = []
for tweet in Cursor(self.twitter_client.user_timeline, id=self.twitter_user).items(num_tweets):
tweets.append(tweet)
return tweets
def get_friend_list(self, num_friends):
friend_list = []
for friend in Cursor(self.twitter_client.friends, id=self.twitter_user).items(num_friends):
friend_list.append(friend)
return friend_list
def get_home_timeline_tweets(self, num_tweets):
home_timeline_tweets = []
for tweet in Cursor(self.twitter_client.home_timeline, id=self.twitter_user).items(num_tweets):
home_timeline_tweets.append(tweet)
return home_timeline_tweets
# # # # TWITTER AUTHENTICATER # # # #
class TwitterAuthenticator():
def authenticate_twitter_app(self):
auth = OAuthHandler(twitter_credentials.CONSUMER_KEY, twitter_credentials.CONSUMER_SECRET)
auth.set_access_token(twitter_credentials.ACCESS_TOKEN, twitter_credentials.ACCESS_TOKEN_SECRET)
return auth
# # # # TWITTER STREAMER # # # #
class TwitterStreamer():
"""
Class for streaming and processing live tweets.
"""
def __init__(self):
self.twitter_autenticator = TwitterAuthenticator()
def stream_tweets(self, fetched_tweets_filename, hash_tag_list):
# This handles Twitter authetification and the connection to Twitter Streaming API
listener = TwitterListener(fetched_tweets_filename)
auth = self.twitter_autenticator.authenticate_twitter_app()
stream = Stream(auth, listener)
# This line filter Twitter Streams to capture data by the keywords:
stream.filter(track=hash_tag_list)
# # # # TWITTER STREAM LISTENER # # # #
class TwitterListener(StreamListener):
"""
This is a basic listener that just prints received tweets to stdout.
"""
def __init__(self, fetched_tweets_filename):
self.fetched_tweets_filename = fetched_tweets_filename
def on_data(self, data):
try:
print(data)
with open(self.fetched_tweets_filename, 'a') as tf:
tf.write(data)
return True
except BaseException as e:
print("Error on_data %s" % str(e))
return True
def on_error(self, status):
if status == 420:
# Returning False on_data method in case rate limit occurs.
return False
print(status)
class TweetAnalyzer():
"""
Functionality for analyzing and categorizing content from tweets.
"""
def clean_tweet(self, tweet):
return ' '.join(re.sub("(@[A-Za-z0-9]+)|([^0-9A-Za-z \t])|(\w+:\/\/\S+)", " ", tweet).split())
def analyze_sentiment(self, tweet):
analysis = TextBlob(self.clean_tweet(tweet))
if analysis.sentiment.polarity > 0:
return 1
elif analysis.sentiment.polarity == 0:
return 0
else:
return -1
def tweets_to_data_frame(self, tweets):
df = pd.DataFrame(data=[tweet.text for tweet in tweets], columns=['tweets'])
df['id'] = np.array([tweet.id for tweet in tweets])
df['len'] = np.array([len(tweet.text) for tweet in tweets])
df['date'] = np.array([tweet.created_at for tweet in tweets])
df['source'] = np.array([tweet.source for tweet in tweets])
df['likes'] = np.array([tweet.favorite_count for tweet in tweets])
df['retweets'] = np.array([tweet.retweet_count for tweet in tweets])
return df
if __name__ == '__main__':
# twitter_client = TwitterClient()
# tweet_analyzer = TweetAnalyzer()
# api = twitter_client.get_twitter_client_api()
# tweets = api.user_timeline(screen_name="realDonaldTrump", count=200)
# df = tweet_analyzer.tweets_to_data_frame(tweets)
# df['sentiment'] = np.array([tweet_analyzer.analyze_sentiment(tweet) for tweet in df['tweets']])
# print(df.head(10))
twitter_streamer = TwitterStreamer()
fetched_tweets_filename = "tweets.txt"
hash_tag_list = ["Donald Trump"]
twitter_streamer.stream_tweets(fetched_tweets_filename, hash_tag_list)