Skip to content

Commit

Permalink
Add backoff for fetching feeds that are down
Browse files Browse the repository at this point in the history
  • Loading branch information
tulir committed Apr 9, 2021
1 parent 794d8e1 commit 5efba56
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 22 deletions.
2 changes: 2 additions & 0 deletions base-config.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# Feed update interval in minutes
update_interval: 60
# Maximum backoff in minutes when failing to fetch feeds (defaults to 5 days)
max_backoff: 7200
# The time to sleep between send requests when broadcasting a new feed entry.
# Set to 0 to disable sleep or -1 to run all requests asynchronously at once.
spam_sleep: 2
Expand Down
2 changes: 1 addition & 1 deletion maubot.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
maubot: 0.1.0
id: xyz.maubot.rss
version: 0.2.3
version: 0.2.4
license: AGPL-3.0-or-later
modules:
- rss
Expand Down
37 changes: 27 additions & 10 deletions rss/bot.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# rss - A maubot plugin to subscribe to RSS/Atom feeds.
# Copyright (C) 2020 Tulir Asokan
# Copyright (C) 2021 Tulir Asokan
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
Expand All @@ -18,6 +18,7 @@
from time import mktime, time
from string import Template
import asyncio
import time

import aiohttp
import hashlib
Expand All @@ -31,10 +32,13 @@

from .db import Database, Feed, Entry, Subscription

rss_change_level = EventType.find("xyz.maubot.rss", t_class=EventType.Class.STATE)


class Config(BaseProxyConfig):
def do_update(self, helper: ConfigUpdateHelper) -> None:
helper.copy("update_interval")
helper.copy("max_backoff")
helper.copy("spam_sleep")
helper.copy("command_prefix")
helper.copy("admins")
Expand Down Expand Up @@ -108,14 +112,25 @@ async def _poll_once(self) -> None:
subs = self.db.get_feeds()
if not subs:
return
for res in asyncio.as_completed([self.try_parse_feed(feed=feed) for feed in subs]):
now = int(time.time())
tasks = [self.try_parse_feed(feed=feed) for feed in subs if feed.next_retry < now]
feed: Feed
entries: Iterable[Entry]
for res in asyncio.as_completed(tasks):
feed, entries = await res
if not entries:
error_count = feed.error_count + 1
next_retry_delay = self.config["update_interval"] * 60 * error_count
next_retry_delay = min(next_retry_delay, self.config["max_backoff"] * 60)
next_retry = int(time.time() + next_retry_delay)
self.db.set_backoff(feed, error_count, next_retry)
continue
elif feed.error_count > 0:
self.db.set_backoff(feed, error_count=0, next_retry=0)
try:
new_entries = {entry.id: entry for entry in entries}
except Exception:
self.log.exception(f"Error items of {feed.url}")
self.log.exception(f"Weird error in items of {feed.url}")
continue
for old_entry in self.db.get_entries(feed.id):
new_entries.pop(old_entry.id, None)
Expand All @@ -137,16 +152,16 @@ async def _poll_feeds(self) -> None:
async def try_parse_feed(self, feed: Optional[Feed] = None) -> Tuple[Feed, Iterable[Entry]]:
try:
return await self.parse_feed(feed=feed)
except Exception:
self.log.exception(f"Failed to parse feed {feed.id} / {feed.url}")
except Exception as e:
self.log.warning(f"Failed to parse feed {feed.id} / {feed.url}: {e}")
return feed, []

async def parse_feed(self, *, feed: Optional[Feed] = None, url: Optional[str] = None
) -> Tuple[Feed, Iterable[Entry]]:
if feed is None:
if url is None:
raise ValueError("Either feed or url must be set")
feed = Feed(-1, url, "", "", "", [])
feed = Feed(-1, url, "", "", "", 0, 0, [])
elif url is not None:
raise ValueError("Only one of feed or url must be set")
resp = await self.http.get(feed.url)
Expand All @@ -167,7 +182,7 @@ async def _parse_json(cls, feed: Feed, resp: aiohttp.ClientResponse
raise ValueError("Feed is not a valid JSON feed (items is not a list)")
feed = Feed(id=feed.id, title=content["title"], subtitle=content.get("subtitle", ""),
url=feed.url, link=content.get("home_page_url", ""),
subscriptions=feed.subscriptions)
next_retry=0, error_count=0, subscriptions=feed.subscriptions)
return feed, (cls._parse_json_entry(feed.id, entry) for entry in content["items"])

@classmethod
Expand Down Expand Up @@ -203,7 +218,7 @@ async def _parse_rss(cls, feed: Feed, resp: aiohttp.ClientResponse
feed_data = parsed_data.get("feed", {})
feed = Feed(id=feed.id, url=feed.url, title=feed_data.get("title", feed.url),
subtitle=feed_data.get("description", ""), link=feed_data.get("link", ""),
subscriptions=feed.subscriptions)
error_count=0, next_retry=0, subscriptions=feed.subscriptions)
return feed, (cls._parse_rss_entry(feed.id, entry) for entry in parsed_data.entries)

@classmethod
Expand Down Expand Up @@ -249,8 +264,8 @@ async def can_manage(self, evt: MessageEvent) -> bool:
return True
levels = await self.get_power_levels(evt.room_id)
user_level = levels.get_user_level(evt.sender)
state_level = levels.events.get("xyz.maubot.rss", levels.state_default)
if type(state_level) != int:
state_level = levels.get_event_level(rss_change_level)
if not isinstance(state_level, int):
state_level = 50
if user_level < state_level:
await evt.reply("You don't have the permission to "
Expand Down Expand Up @@ -278,6 +293,8 @@ async def subscribe(self, evt: MessageEvent, url: str) -> None:
return
feed = self.db.create_feed(info)
self.db.add_entries(entries, override_feed_id=feed.id)
elif feed.error_count > 0:
self.db.set_backoff(feed, error_count=feed.error_count, next_retry=0)
self.db.subscribe(feed.id, evt.room_id, evt.sender)
await evt.reply(f"Subscribed to feed ID {feed.id}: [{feed.title}]({feed.url})")

Expand Down
38 changes: 27 additions & 11 deletions rss/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@

Subscription = NamedTuple("Subscription", feed_id=int, room_id=RoomID, user_id=UserID,
notification_template=Template, send_notice=bool)
Feed = NamedTuple("Feed", id=int, url=str, title=str, subtitle=str, link=str,
subscriptions=List[Subscription])
Feed = NamedTuple("Feed", id=int, url=str, title=str, subtitle=str, link=str, next_retry=int,
error_count=int, subscriptions=List[Subscription])
Entry = NamedTuple("Entry", feed_id=int, id=str, date=datetime, title=str, summary=str, link=str)


Expand All @@ -46,7 +46,9 @@ def __init__(self, db: Engine) -> None:
Column("url", Text, nullable=False, unique=True),
Column("title", Text, nullable=False),
Column("subtitle", Text, nullable=False),
Column("link", Text, nullable=False))
Column("link", Text, nullable=False),
Column("next_retry", Integer, nullable=False),
Column("error_count", Integer, nullable=False))
self.subscription = Table("subscription", metadata,
Column("feed_id", Integer, ForeignKey("feed.id"),
primary_key=True),
Expand Down Expand Up @@ -104,6 +106,10 @@ def upgrade(self) -> None:
if version == 1:
self.db.execute("ALTER TABLE subscription ADD COLUMN send_notice BOOLEAN DEFAULT true")
version = 2
if version == 2:
self.db.execute("ALTER TABLE feed ADD COLUMN next_retry BIGINT DEFAULT 0")
self.db.execute("ALTER TABLE feed ADD COLUMN error_count BIGINT DEFAULT 0")
version = 3
self.db.execute(self.version.delete())
self.db.execute(self.version.insert().values(version=version))

Expand All @@ -116,18 +122,21 @@ def get_feeds(self) -> Iterable[Feed]:
.where(self.subscription.c.feed_id == self.feed.c.id))
map: Dict[int, Feed] = {}
for row in rows:
(feed_id, url, title, subtitle, link,
(feed_id, url, title, subtitle, link, next_retry, error_count,
room_id, user_id, notification_template, send_notice) = row
map.setdefault(feed_id, Feed(feed_id, url, title, subtitle, link, subscriptions=[]))
map.setdefault(feed_id, Feed(feed_id, url, title, subtitle, link, next_retry,
error_count, subscriptions=[]))
map[feed_id].subscriptions.append(
Subscription(feed_id=feed_id, room_id=room_id, user_id=user_id,
notification_template=Template(notification_template),
send_notice=send_notice))
return map.values()

def get_feeds_by_room(self, room_id: RoomID) -> Iterable[Tuple[Feed, UserID]]:
return ((Feed(feed_id, url, title, subtitle, link, subscriptions=[]), user_id)
for (feed_id, url, title, subtitle, link, user_id) in
return ((Feed(feed_id, url, title, subtitle, link, next_retry, error_count,
subscriptions=[]),
user_id)
for (feed_id, url, title, subtitle, link, next_retry, error_count, user_id) in
self.db.execute(select([self.feed, self.subscription.c.user_id])
.where(and_(self.subscription.c.room_id == room_id,
self.subscription.c.feed_id == self.feed.c.id))))
Expand Down Expand Up @@ -174,12 +183,12 @@ def get_subscription(self, feed_id: int, room_id: RoomID) -> Tuple[Optional[Subs
.where(and_(tbl.c.feed_id == feed_id, tbl.c.room_id == room_id,
self.feed.c.id == feed_id)))
try:
(feed_id, url, title, subtitle, link,
(feed_id, url, title, subtitle, link, next_retry, error_count,
room_id, user_id, template, send_notice) = next(rows)
notification_template = Template(template)
return (Subscription(feed_id, room_id, user_id, notification_template, send_notice)
if room_id else None,
Feed(feed_id, url, title, subtitle, link, []))
Feed(feed_id, url, title, subtitle, link, next_retry, error_count, []))
except (ValueError, StopIteration):
return None, None

Expand All @@ -190,9 +199,16 @@ def update_room_id(self, old: RoomID, new: RoomID) -> None:

def create_feed(self, info: Feed) -> Feed:
res = self.db.execute(self.feed.insert().values(url=info.url, title=info.title,
subtitle=info.subtitle, link=info.link))
subtitle=info.subtitle, link=info.link,
next_retry=info.next_retry))
return Feed(id=res.inserted_primary_key[0], url=info.url, title=info.title,
subtitle=info.subtitle, link=info.link, subscriptions=[])
subtitle=info.subtitle, link=info.link, next_retry=info.next_retry,
error_count=info.error_count, subscriptions=[])

def set_backoff(self, info: Feed, error_count: int, next_retry: int) -> None:
self.db.execute(self.feed.update()
.where(self.feed.c.id == info.id)
.values(error_count=error_count, next_retry=next_retry))

def subscribe(self, feed_id: int, room_id: RoomID, user_id: UserID) -> None:
self.db.execute(self.subscription.insert().values(
Expand Down

0 comments on commit 5efba56

Please sign in to comment.