Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use priority queue for job processing #486

Merged
merged 11 commits into from
Jul 24, 2019
18 changes: 17 additions & 1 deletion securedrop_client/api_jobs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
from PyQt5.QtCore import QObject, pyqtSignal
from sdclientapi import API, RequestTimeoutError, AuthError
from sqlalchemy.orm.session import Session
from typing import Any, Optional
from typing import Any, Optional, TypeVar


logger = logging.getLogger(__name__)

DEFAULT_NUM_ATTEMPTS = 5

ApiJobType = TypeVar('ApiJobType', bound='ApiJob')


class ApiInaccessibleError(Exception):

Expand All @@ -35,6 +37,20 @@ class ApiJob(QObject):
def __init__(self, remaining_attempts: int = DEFAULT_NUM_ATTEMPTS) -> None:
super().__init__(None) # `None` because the QOjbect has no parent
self.remaining_attempts = remaining_attempts
self.order_number = None # type: Optional[int]

def __lt__(self, other: ApiJobType) -> bool:
'''
Python's PriorityQueue requires that ApiJobs are sortable as it
retrieves the next job using sorted(list(entries))[0].

For ApiJobs that have equal priority, we need to use the order_number key
to break ties to ensure that objects are retrieved in FIFO order.
'''
if self.order_number is None or other.order_number is None:
raise ValueError('cannot compare jobs without order_number!')

return self.order_number < other.order_number

def _do_call_api(self, api_client: API, session: Session) -> None:
if not api_client:
Expand Down
7 changes: 7 additions & 0 deletions securedrop_client/gui/widgets.py
Original file line number Diff line number Diff line change
Expand Up @@ -1572,6 +1572,13 @@ def __init__(
self.setLayout(main_layout)
self.update_conversation(self.source.collection)

# Refresh the session to update any replies that failed from a network timeout
self.controller.reply_succeeded.connect(self.refresh_conversation)

def refresh_conversation(self):
self.controller.session.refresh(self.source)
self.update_conversation(self.source.collection)

def clear_conversation(self):
while self.conversation_layout.count():
child = self.conversation_layout.takeAt(0)
Expand Down
65 changes: 51 additions & 14 deletions securedrop_client/queue.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
import itertools
import logging

from PyQt5.QtCore import QObject, QThread, pyqtSlot
from queue import Queue
from queue import PriorityQueue
from sdclientapi import API, RequestTimeoutError
from sqlalchemy.orm import scoped_session
from typing import Optional # noqa: F401
from typing import Optional, Tuple # noqa: F401

from securedrop_client.api_jobs.base import ApiJob, ApiInaccessibleError, DEFAULT_NUM_ATTEMPTS
from securedrop_client.api_jobs.downloads import FileDownloadJob
from securedrop_client.api_jobs.downloads import (FileDownloadJob, MessageDownloadJob,
ReplyDownloadJob)
from securedrop_client.api_jobs.uploads import SendReplyJob
from securedrop_client.api_jobs.updatestar import UpdateStarJob


logger = logging.getLogger(__name__)
Expand All @@ -19,8 +23,27 @@ def __init__(self, api_client: API, session_maker: scoped_session) -> None:
super().__init__()
self.api_client = api_client
self.session_maker = session_maker
self.queue = Queue() # type: Queue[ApiJob]
self.last_job = None # type: Optional[ApiJob]
self.queue = PriorityQueue() # type: PriorityQueue[Tuple[int, ApiJob]]

# One of the challenges of using Python's PriorityQueue is that
# for objects (jobs) with equal priorities, they are not retrieved
# in FIFO order due to the fact PriorityQueue is implemented using
# heapq which does not have sort stability. In order to ensure sort
# stability, we need to add a counter to ensure that objects with equal
# priorities are retrived in FIFO order.
# See also: https://bugs.python.org/issue17794
self.order_number = itertools.count()

def add_job(self, priority: int, job: ApiJob) -> None:
"""
Increment the queue's internal counter/order_number, assign an
order_number to the job to track its position in the queue,
and submit the job with its priority to the queue.
"""

current_order_number = next(self.order_number)
job.order_number = current_order_number
self.queue.put_nowait((priority, job))

@pyqtSlot()
def process(self) -> None: # pragma: nocover
Expand All @@ -29,19 +52,18 @@ def process(self) -> None: # pragma: nocover
def _process(self, exit_loop: bool) -> None:
while True:
session = self.session_maker()
# retry the "cached" job if it exists, otherwise get the next job
if self.last_job is not None:
job = self.last_job
self.last_job = None
else:
job = self.queue.get(block=True)
priority, job = self.queue.get(block=True)

try:
job._do_call_api(self.api_client, session)
except RequestTimeoutError:
# Reset number of remaining attempts for this job to the default
job.remaining_attempts = DEFAULT_NUM_ATTEMPTS
self.last_job = job # "cache" the last job since we can't re-queue it

# Resubmit job without modifying counter to ensure jobs with equal
# priorities are processed in the order that they were submitted
# _by the user_ to the queue.
self.queue.put_nowait((priority, job))
except ApiInaccessibleError:
# This is a guard against #397, we should pause the queue execution when this
# happens in the future and flag the situation to the user (see ticket #379).
Expand All @@ -54,6 +76,19 @@ def _process(self, exit_loop: bool) -> None:


class ApiJobQueue(QObject):
# These are the priorities for processing jobs.
# Lower numbers corresponds to a higher priority.
JOB_PRIORITIES = {
# LogoutJob: 11, # Not yet implemented
# MetadataSyncJob: 12, # Not yet implemented
FileDownloadJob: 13, # File downloads processed in separate queue
MessageDownloadJob: 13,
ReplyDownloadJob: 13,
# DeletionJob: 14, # Not yet implemented
SendReplyJob: 15,
UpdateStarJob: 16,
# FlagJob: 16, # Not yet implemented
}

def __init__(self, api_client: API, session_maker: scoped_session) -> None:
super().__init__(None)
Expand Down Expand Up @@ -100,9 +135,11 @@ def enqueue(self, job: ApiJob) -> None:
# First check the queues are started in case they died for some reason.
self.start_queues()

priority = self.JOB_PRIORITIES[type(job)]

if isinstance(job, FileDownloadJob):
logger.debug('Adding job to download queue')
self.download_file_queue.queue.put_nowait(job)
self.download_file_queue.add_job(priority, job)
else:
logger.debug('Adding job to main queue')
self.main_queue.queue.put_nowait(job)
self.main_queue.add_job(priority, job)
22 changes: 22 additions & 0 deletions tests/api_jobs/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,3 +158,25 @@ def test_ApiJob_retry_timeout(mocker):

assert not api_job.success_signal.emit.called
assert not api_job.failure_signal.emit.called


def test_ApiJob_comparison(mocker):
return_value = 'wat'
api_job_cls = dummy_job_factory(mocker, return_value)
api_job_1 = api_job_cls()
api_job_1.order_number = 1

api_job_2 = api_job_cls()
api_job_2.order_number = 2

assert api_job_1 < api_job_2


def test_ApiJob_order_number_unset(mocker):
return_value = 'wat'
api_job_cls = dummy_job_factory(mocker, return_value)
api_job_1 = api_job_cls()
api_job_2 = api_job_cls()

with pytest.raises(ValueError):
api_job_1 < api_job_2
15 changes: 15 additions & 0 deletions tests/gui/test_widgets.py
Original file line number Diff line number Diff line change
Expand Up @@ -1363,6 +1363,21 @@ def test_ConversationView_init(mocker, homedir):
assert isinstance(cv.conversation_layout, QVBoxLayout)


def test_ConversationView_refresh_conversation(mocker, homedir):
"""
Ensure that the session refreshes whenever there is a new reply in case there are previously
failed replies.
"""
source = factory.Source()
cv = ConversationView(source, mocker.MagicMock())
mocker.patch.object(cv, 'update_conversation')

cv.refresh_conversation()

cv.controller.session.refresh.assert_called_with(source)
cv.update_conversation.assert_called_once_with(source.collection)


def test_ConversationView_update_conversation_position_follow(mocker, homedir):
"""
Check the signal handler sets the correct value for the scrollbar to be
Expand Down
Loading