-
Notifications
You must be signed in to change notification settings - Fork 0
/
call-apis.py
30 lines (26 loc) · 860 Bytes
/
call-apis.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from kafka import KafkaProducer
import json
import praw
import os
from dotenv import load_dotenv
# Load the environment variables from .env file
load_dotenv()
# Setup the Reddit connection
reddit = praw.Reddit(
client_id=os.getenv('REDDIT_CLIENT_ID'),
client_secret=os.getenv('REDDIT_CLIENT_SECRET'),
user_agent=os.getenv('REDDIT_USER_AGENT')
)
# Setup Kafka producer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda m: json.dumps(m).encode('ascii'))
# Fetch and send data to Kafka
for submission in reddit.subreddit('worldnews').stream.submissions():
message = {
'title': submission.title,
'id': submission.id,
'created_utc': submission.created_utc,
'upvotes': submission.score
}
producer.send('worldnews', message)
producer.flush()