Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use a larger, dedicated threadpool for media sending #17564

Merged
merged 3 commits into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/17564.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Speed up responding to media requests.
19 changes: 13 additions & 6 deletions synapse/media/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,15 @@
from synapse.http.server import finish_request, respond_with_json
from synapse.http.site import SynapseRequest
from synapse.logging.context import (
defer_to_thread,
defer_to_threadpool,
make_deferred_yieldable,
run_in_background,
)
from synapse.types import ISynapseReactor
from synapse.util import Clock
from synapse.util.stringutils import is_ascii

if TYPE_CHECKING:
from synapse.server import HomeServer
from synapse.storage.databases.main.media_repository import LocalMedia


Expand Down Expand Up @@ -132,6 +132,7 @@ def respond_404(request: SynapseRequest) -> None:


async def respond_with_file(
hs: "HomeServer",
request: SynapseRequest,
media_type: str,
file_path: str,
Expand All @@ -148,7 +149,7 @@ async def respond_with_file(
add_file_headers(request, media_type, file_size, upload_name)

with open(file_path, "rb") as f:
await ThreadedFileSender(request.reactor).beginFileTransfer(f, request)
await ThreadedFileSender(hs).beginFileTransfer(f, request)

finish_request(request)
else:
Expand Down Expand Up @@ -632,8 +633,9 @@ class ThreadedFileSender:
# read.
TIMEOUT_SECONDS = 90.0

def __init__(self, reactor: ISynapseReactor) -> None:
self.reactor = reactor
def __init__(self, hs: "HomeServer") -> None:
self.reactor = hs.get_reactor()
self.thread_pool = hs.get_media_sender_thread_pool()

self.file: Optional[BinaryIO] = None
self.deferred: "Deferred[None]" = Deferred()
Expand Down Expand Up @@ -661,7 +663,12 @@ def beginFileTransfer(

# We set the wakeup signal as we should start producing immediately.
self.wakeup_event.set()
run_in_background(defer_to_thread, self.reactor, self._on_thread_read_loop)
run_in_background(
defer_to_threadpool,
self.reactor,
self.thread_pool,
self._on_thread_read_loop,
)

return make_deferred_yieldable(self.deferred)

Expand Down
12 changes: 5 additions & 7 deletions synapse/media/media_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
from synapse.util import Clock
from synapse.util.file_consumer import BackgroundFileConsumer

from ..types import ISynapseReactor, JsonDict
from ..types import JsonDict
from ._base import FileInfo, Responder
from .filepath import MediaFilePaths

Expand Down Expand Up @@ -209,7 +209,7 @@ async def fetch_media(self, file_info: FileInfo) -> Optional[Responder]:
local_path = os.path.join(self.local_media_directory, path)
if os.path.exists(local_path):
logger.debug("responding with local file %s", local_path)
return FileResponder(self.reactor, open(local_path, "rb"))
return FileResponder(self.hs, open(local_path, "rb"))
logger.debug("local file %s did not exist", local_path)

for provider in self.storage_providers:
Expand Down Expand Up @@ -332,14 +332,12 @@ class FileResponder(Responder):
is closed when finished streaming.
"""

def __init__(self, reactor: ISynapseReactor, open_file: BinaryIO):
self.reactor = reactor
def __init__(self, hs: "HomeServer", open_file: BinaryIO):
self.hs = hs
self.open_file = open_file

def write_to_consumer(self, consumer: IConsumer) -> Deferred:
return ThreadedFileSender(self.reactor).beginFileTransfer(
self.open_file, consumer
)
return ThreadedFileSender(self.hs).beginFileTransfer(self.open_file, consumer)

def __exit__(
self,
Expand Down
2 changes: 1 addition & 1 deletion synapse/media/storage_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ async def fetch(self, path: str, file_info: FileInfo) -> Optional[Responder]:

backup_fname = os.path.join(self.base_directory, path)
if os.path.isfile(backup_fname):
return FileResponder(self.reactor, open(backup_fname, "rb"))
return FileResponder(self.hs, open(backup_fname, "rb"))

return None

Expand Down
6 changes: 3 additions & 3 deletions synapse/media/thumbnailer.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,11 +374,11 @@ async def select_or_generate_local_thumbnail(
await respond_with_multipart_responder(
self.hs.get_clock(),
request,
FileResponder(self.reactor, open(file_path, "rb")),
FileResponder(self.hs, open(file_path, "rb")),
media_info,
)
else:
await respond_with_file(request, desired_type, file_path)
await respond_with_file(self.hs, request, desired_type, file_path)
else:
logger.warning("Failed to generate thumbnail")
raise SynapseError(400, "Failed to generate thumbnail.")
Expand Down Expand Up @@ -456,7 +456,7 @@ async def select_or_generate_remote_thumbnail(
)

if file_path:
await respond_with_file(request, desired_type, file_path)
await respond_with_file(self.hs, request, desired_type, file_path)
else:
logger.warning("Failed to generate thumbnail")
raise SynapseError(400, "Failed to generate thumbnail.")
Expand Down
19 changes: 19 additions & 0 deletions synapse/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

from twisted.internet.interfaces import IOpenSSLContextFactory
from twisted.internet.tcp import Port
from twisted.python.threadpool import ThreadPool
from twisted.web.iweb import IPolicyForHTTPS
from twisted.web.resource import Resource

Expand Down Expand Up @@ -941,3 +942,21 @@ def get_worker_locks_handler(self) -> WorkerLocksHandler:
@cache_in_self
def get_task_scheduler(self) -> TaskScheduler:
return TaskScheduler(self)

@cache_in_self
def get_media_sender_thread_pool(self) -> ThreadPool:
"""Fetch the threadpool used to read files when responding to media
download requests."""

# We can choose a large threadpool size as these threads predominately
# do IO rather than CPU work.
media_threadpool = ThreadPool(
name="media_threadpool", minthreads=1, maxthreads=50
)

media_threadpool.start()
self.get_reactor().addSystemEventTrigger(
"during", "shutdown", media_threadpool.stop
)

return media_threadpool
6 changes: 6 additions & 0 deletions tests/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -1166,6 +1166,12 @@ async def validate_hash(p: str, h: str) -> bool:

hs.get_auth_handler().validate_hash = validate_hash # type: ignore[assignment]

# We need to replace the media threadpool with the fake test threadpool.
def thread_pool() -> threadpool.ThreadPool:
return reactor.getThreadPool()

hs.get_media_sender_thread_pool = thread_pool # type: ignore[method-assign]

# Load any configured modules into the homeserver
module_api = hs.get_module_api()
for module, module_config in hs.config.modules.loaded_modules:
Expand Down
Loading