-
Notifications
You must be signed in to change notification settings - Fork 21
/
feedfinder2.py
162 lines (129 loc) · 4.82 KB
/
feedfinder2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import print_function
__version__ = "0.0.4"
try:
__FEEDFINDER2_SETUP__
except NameError:
__FEEDFINDER2_SETUP__ = False
if not __FEEDFINDER2_SETUP__:
__all__ = ["find_feeds"]
import logging
import requests
from bs4 import BeautifulSoup
from six.moves.urllib import parse as urlparse
def coerce_url(url):
url = url.strip()
if url.startswith("feed://"):
return "http://{0}".format(url[7:])
for proto in ["http://", "https://"]:
if url.startswith(proto):
return url
return "http://{0}".format(url)
class FeedFinder(object):
def __init__(self, user_agent=None, timeout=None):
if user_agent is None:
user_agent = "feedfinder2/{0}".format(__version__)
self.user_agent = user_agent
self.timeout = timeout
def get_feed(self, url):
try:
r = requests.get(url, headers={"User-Agent": self.user_agent}, timeout=self.timeout)
except Exception as e:
logging.warning("Error while getting '{0}'".format(url))
logging.warning("{0}".format(e))
return None
return r.text
def is_feed_data(self, text):
data = text.lower()
if data.count("<html"):
return False
return data.count("<rss")+data.count("<rdf")+data.count("<feed")
def is_feed(self, url):
text = self.get_feed(url)
if text is None:
return False
return self.is_feed_data(text)
def is_feed_url(self, url):
return any(map(url.lower().endswith,
[".rss", ".rdf", ".xml", ".atom"]))
def is_feedlike_url(self, url):
return any(map(url.lower().count,
["rss", "rdf", "xml", "atom", "feed"]))
def find_feeds(url, check_all=False, user_agent=None, timeout=None):
finder = FeedFinder(user_agent=user_agent, timeout=timeout)
# Format the URL properly.
url = coerce_url(url)
# Download the requested URL.
text = finder.get_feed(url)
if text is None:
return []
# Check if it is already a feed.
if finder.is_feed_data(text):
return [url]
# Look for <link> tags.
logging.info("Looking for <link> tags.")
tree = BeautifulSoup(text, "html.parser")
links = []
for link in tree.find_all("link"):
if link.get("type") in ["application/rss+xml",
"text/xml",
"application/atom+xml",
"application/x.atom+xml",
"application/x-atom+xml"]:
links.append(urlparse.urljoin(url, link.get("href", "")))
# Check the detected links.
urls = list(filter(finder.is_feed, links))
logging.info("Found {0} feed <link> tags.".format(len(urls)))
if len(urls) and not check_all:
return sort_urls(urls)
# Look for <a> tags.
logging.info("Looking for <a> tags.")
local, remote = [], []
for a in tree.find_all("a"):
href = a.get("href", None)
if href is None:
continue
if "://" not in href and finder.is_feed_url(href):
local.append(href)
if finder.is_feedlike_url(href):
remote.append(href)
# Check the local URLs.
local = [urlparse.urljoin(url, l) for l in local]
urls += list(filter(finder.is_feed, local))
logging.info("Found {0} local <a> links to feeds.".format(len(urls)))
if len(urls) and not check_all:
return sort_urls(urls)
# Check the remote URLs.
remote = [urlparse.urljoin(url, l) for l in remote]
urls += list(filter(finder.is_feed, remote))
logging.info("Found {0} remote <a> links to feeds.".format(len(urls)))
if len(urls) and not check_all:
return sort_urls(urls)
# Guessing potential URLs.
fns = ["atom.xml", "index.atom", "index.rdf", "rss.xml", "index.xml",
"index.rss"]
urls += list(filter(finder.is_feed, [urlparse.urljoin(url, f)
for f in fns]))
return sort_urls(urls)
def url_feed_prob(url):
if "comments" in url:
return -2
if "georss" in url:
return -1
kw = ["atom", "rss", "rdf", ".xml", "feed"]
for p, t in zip(range(len(kw), 0, -1), kw):
if t in url:
return p
return 0
def sort_urls(feeds):
return sorted(list(set(feeds)), key=url_feed_prob, reverse=True)
if __name__ == "__main__":
print(find_feeds("www.preposterousuniverse.com/blog/", timeout = 1))
print(find_feeds("www.preposterousuniverse.com/blog/"))
print(find_feeds("http://xkcd.com"))
print(find_feeds("dan.iel.fm/atom.xml"))
print(find_feeds("dan.iel.fm", check_all=True))
print(find_feeds("kapadia.github.io"))
print(find_feeds("blog.jonathansick.ca"))
print(find_feeds("asdasd"))