Skip to content

Commit

Permalink
Refactor API to use a single database client
Browse files Browse the repository at this point in the history
See issue #370

Previously, the Krake API created one etcd client for each request
received. Because of the etcd client used, a memory leakage appeared,
where the client would put too much elements in cache, see
Revolution1/etcd3-py#141

To circumvent the issue, one single client is now used for the whole
Krake API. As this client leverages a pool of connections, it could
make sense to only use one of them. With this method, the API does not
show any sign of memory leak.

Signed-off-by: Jean Chorin <[email protected]>
  • Loading branch information
jchorin committed Aug 14, 2020
1 parent eaafaab commit 0f78894
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 40 deletions.
29 changes: 24 additions & 5 deletions krake/krake/api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
"""
import logging
import ssl
from functools import partial

from aiohttp import web, ClientSession
from krake.api.database import Session

from krake.data.core import RoleBinding
from . import __version__ as version
Expand Down Expand Up @@ -90,11 +93,7 @@ def create_app(config):
middlewares=[
middlewares.error_log(),
authentication,
middlewares.database(
host=config.etcd.host,
port=config.etcd.port,
retry=config.etcd.retry_transactions,
),
middlewares.retry_transaction(retry=config.etcd.retry_transactions),
],
)
app["config"] = config
Expand All @@ -103,6 +102,9 @@ def create_app(config):

# Cleanup contexts
app.cleanup_ctx.append(http_session)
app.cleanup_ctx.append(
partial(db_session, host=config.etcd.host, port=config.etcd.port)
)

# Routes
app.add_routes(routes)
Expand All @@ -113,6 +115,23 @@ def create_app(config):
return app


async def db_session(app, host, port):
"""Async generator creating an database :class:`krake.api.database.Session` that can
be used by other components (middleware, route handlers) or the requests. The
database session is available under the ``db`` key of the application.
This function should be used as cleanup context (see
:attr:`aiohttp.web.Application.cleanup_ctx`).
Args:
app (aiohttp.web.Application): Web application
"""
async with Session(host=host, port=port) as session:
app["db"] = session
yield


async def http_session(app):
"""Async generator creating an :class:`aiohttp.ClientSession` HTTP session
that can be used by other components (middleware, route handlers). The HTTP
Expand Down
6 changes: 3 additions & 3 deletions krake/krake/api/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,16 @@ def json_error(exc, content):
def session(request):
"""Load the database session for a given aiohttp request
Internally, it just returns the value that was assigned by
func:`krake.middlewares.database`.
Internally, it just returns the value that was given as cleanup context by
func:`krake.api.app.db_session`.
Args:
request (aiohttp.web.Request): HTTP request
Returns:
krake.database.Session: Database session for the given request
"""
return request["db"]
return request.app["db"]


class Heartbeat(object):
Expand Down
56 changes: 24 additions & 32 deletions krake/krake/api/middlewares.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,50 +4,42 @@
from aiohttp import web
from krake.api.helpers import HttpReason, HttpReasonCode

from .database import Session, TransactionError
from .database import TransactionError


def database(host, port, retry=1):
"""Middleware factory for per-request etcd database sessions and
transaction error handling.
def retry_transaction(retry=1):
"""Middleware factory for transaction error handling.
If an :class:`.database.TransactionError` occurs, the request handler is
retried for the specified number of times. If the transaction error
persists, a *409 Conflict* HTTP exception is raised.
If an :class:`.database.TransactionError` occurs, the request handler is retried for
the specified number of times. If the transaction error persists, a *409 Conflict*
HTTP exception is raised.
Args:
host (str): Host of the etcd server
port (int): TCP port of the etcd server
retry (int, optional): Number of retries if a transaction error
occurs.
retry (int, optional): Number of retries if a transaction error occurs.
Returns:
aiohttp middleware injecting an etcd database session into each HTTP
request and handling transaction errors.
coroutine: aiohttp middleware handling transaction errors.
"""
# TODO: Maybe we can share the TCP connection pool across all HTTP
# handlers (like for SQLAlchemy engines)
@web.middleware
async def database_middleware(request, handler):
async with Session(host=host, port=port) as session:
request["db"] = session

for _ in range(retry + 1):
try:
return await handler(request)
except TransactionError as err:
request.app.logger.warn("Transaction failed (%s)", err)

reason = HttpReason(
reason="Concurrent writes to database",
code=HttpReasonCode.TRANSACTION_ERROR,
)
raise web.HTTPConflict(
text=json.dumps(reason.serialize()), content_type="application/json"
)

return database_middleware
async def retry_transaction_middleware(request, handler):
for _ in range(retry + 1):
try:
return await handler(request)
except TransactionError as err:
request.app.logger.warn("Transaction failed (%s)", err)

reason = HttpReason(
reason="Concurrent writes to database",
code=HttpReasonCode.TRANSACTION_ERROR,
)
raise web.HTTPConflict(
text=json.dumps(reason.serialize()), content_type="application/json"
)

return retry_transaction_middleware


def error_log():
Expand Down

0 comments on commit 0f78894

Please sign in to comment.