Skip to content

Commit

Permalink
Remove legacy db entirely (#236)
Browse files Browse the repository at this point in the history
Signed-off-by: H.Shay <[email protected]>
  • Loading branch information
H-Shay authored Aug 4, 2021
1 parent c0cb167 commit d52568e
Show file tree
Hide file tree
Showing 12 changed files with 26 additions and 390 deletions.
33 changes: 1 addition & 32 deletions .github/workflows/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:
- run: python -m pip install tox
- run: tox -e check_types

run-unit-tests-sqlite:
run-unit-tests:
needs: [check-code-style, check-types-mypy]
runs-on: ubuntu-latest
steps:
Expand All @@ -35,34 +35,3 @@ jobs:
python-version: "3.7"
- run: python -m pip install -e .
- run: trial tests

run-unit-tests-postgres:
needs: [check-code-style, check-types-mypy]
runs-on: ubuntu-latest

services:
postgres:
image: postgres:11
ports:
- 5432:5432
env:
POSTGRES_PASSWORD: "postgres"
POSTGRES_INITDB_ARGS: "--lc-collate C --lc-ctype C --encoding UTF8"
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
steps:
- uses: actions/checkout@v2
- uses: actions/setup-python@v2
with:
python-version: "3.7"
- run: python -m pip install -e .
- run: trial tests
env:
TEST_USE_POSTGRES: "yes"
TEST_POSTGRES_USER: "postgres"
TEST_POSTGRES_PASSWORD: "postgres"
TEST_POSTGRES_HOST: "localhost"
10 changes: 0 additions & 10 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,6 @@ Everyone is welcome to contribute code to Sygnal, provided you are willing to
license your contributions under the same license as the project itself. In
this case, the [Apache Software License v2](LICENSE).

## Preparing your development environment

Sygnal depends on the `psycopg2` database adapter for PostgreSQL.
You may need to install development headers for Python and libpq.
For example on Debian/Ubuntu distributions these can be installed with:

```bash
sudo apt install libpq-dev python3-dev build-essential
```

### Create a virtualenv

To contribute to Sygnal, ensure you have Python 3.7 or newer and then run:
Expand Down
3 changes: 0 additions & 3 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@ To change this, set the ``SYGNAL_CONF`` environment variable to the path to your
A sample configuration file is provided in this repository;
see ``sygnal.yaml.sample``.

Sygnal supports using either SQLite3 or PostgreSQL as a database backend. See the ``sygnal.yaml.sample``
for more information on how to configure.

The `apps:` section is where you set up different apps that are to be handled.
Each app should be given its own subsection, with the key of that subsection being the app's ``app_id``.
Keys in this section take the form of the ``app_id``, as specified when setting up a Matrix pusher
Expand Down
1 change: 1 addition & 0 deletions changelog.d/236.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Remove legacy database to ease horizontal scaling.
8 changes: 1 addition & 7 deletions docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,7 @@ FROM python:3.7-slim as builder
# (we really just care about caching a wheel here, as the "pip install" below
# will install them again.)

# we need dependencies for postgresql
RUN apt-get update && apt-get install -y gcc git libpq-dev

RUN pip install --prefix="/install" --no-warn-script-location cryptography

# now install sygnal and all of the python deps to /install.
# install sygnal and all of the python deps to /install.

COPY . /sygnal/

Expand All @@ -33,7 +28,6 @@ RUN pip install --prefix="/install" --no-warn-script-location /sygnal
###

FROM python:3.7-slim
RUN apt-get update && apt-get install -y libpq5 && apt-get clean
COPY --from=builder /install /usr/local

EXPOSE 5000/tcp
Expand Down
3 changes: 0 additions & 3 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,6 @@ ignore_missing_imports = True
[mypy-importlib_metadata]
ignore_missing_imports = True

[mypy-psycopg2]
ignore_missing_imports = True

[mypy-setuptools]
ignore_missing_imports = True

Expand Down
1 change: 0 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ def read(fname: Union[str, "PathLike[str]"]) -> str:
"sentry-sdk>=0.10.2",
"zope.interface>=4.6.0",
"idna>=2.8",
"psycopg2>=2.8.4",
"importlib_metadata",
"pywebpush>=1.13.0",
"py-vapid>=1.7.0",
Expand Down
38 changes: 0 additions & 38 deletions sygnal.yaml.sample
Original file line number Diff line number Diff line change
Expand Up @@ -3,44 +3,6 @@
# See: matrix.org
##

# The 'database' setting defines the database that sygnal uses to store all of
# its data.
#
# 'name' gives the database engine to use: either 'sqlite3' (for SQLite) or
# 'psycopg2' (for PostgreSQL).
#
# 'args' gives options which are passed through to the database engine,
# except for options starting 'cp_', which are used to configure the Twisted
# connection pool. For a reference to valid arguments, see:
# * for sqlite: https://docs.python.org/3/library/sqlite3.html#sqlite3.connect
# * for postgres: https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-PARAMKEYWORDS
# * for the connection pool: https://twistedmatrix.com/documents/current/api/twisted.enterprise.adbapi.ConnectionPool.html#__init__
#
#
# Example SQLite configuration:
#
#database:
# name: sqlite3
# args:
# dbfile: /path/to/database.db
#
#
# Example Postgres configuration:
#
#database:
# name: psycopg2
# args:
# host: localhost
# database: sygnal
# user: sygnal
# password: pass
# cp_min: 1
# cp_max: 5
#
database:
name: sqlite3
args:
dbfile: sygnal.db

## Logging #
#
Expand Down
129 changes: 8 additions & 121 deletions sygnal/gcmpushkin.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

from opentracing import logs, tags
from prometheus_client import Counter, Gauge, Histogram
from twisted.enterprise.adbapi import ConnectionPool
from twisted.internet.defer import DeferredSemaphore
from twisted.web.client import FileBodyProducer, HTTPConnectionPool, readBody
from twisted.web.http_headers import Headers
Expand Down Expand Up @@ -99,7 +98,7 @@ class GcmPushkin(ConcurrencyLimitedPushkin):
"max_connections",
} | ConcurrencyLimitedPushkin.UNDERSTOOD_CONFIG_FIELDS

def __init__(self, name, sygnal, config, canonical_reg_id_store):
def __init__(self, name, sygnal, config):
super(GcmPushkin, self).__init__(name, sygnal, config)

nonunderstood = set(self.cfg.keys()).difference(self.UNDERSTOOD_CONFIG_FIELDS)
Expand Down Expand Up @@ -128,9 +127,6 @@ def __init__(self, name, sygnal, config, canonical_reg_id_store):
proxy_url_str=proxy_url,
)

self.db = sygnal.database
self.canonical_reg_id_store = canonical_reg_id_store

self.api_key = self.get_config("api_key")
if not self.api_key:
raise PushkinSetupException("No API key set in config")
Expand All @@ -154,14 +150,7 @@ async def create(cls, name, sygnal, config):
Returns:
an instance of this Pushkin
"""
logger.debug("About to set up CanonicalRegId Store")
canonical_reg_id_store = CanonicalRegIdStore(
sygnal.database, sygnal.database_engine
)
await canonical_reg_id_store.setup()
logger.debug("Finished setting up CanonicalRegId Store")

return cls(name, sygnal, config, canonical_reg_id_store)
return cls(name, sygnal, config)

async def _perform_http_request(self, body, headers):
"""
Expand Down Expand Up @@ -276,11 +265,6 @@ async def _request_dispatch(self, n, log, body, headers, pushkeys, span):
# determine which pushkeys to retry or forget about
new_pushkeys = []
for i, result in enumerate(resp_object["results"]):
span.set_tag("gcm_regid_updated", "registration_id" in result)
if "registration_id" in result:
await self.canonical_reg_id_store.set_canonical_id(
pushkeys[i], result["registration_id"]
)
if "error" in result:
log.warning(
"Error for pushkey %s: %s", pushkeys[i], result["error"]
Expand Down Expand Up @@ -333,30 +317,13 @@ async def _dispatch_notification_unlimited(self, n, device, context):
with self.sygnal.tracer.start_span(
"gcm_dispatch", tags=span_tags, child_of=context.opentracing_span
) as span_parent:
reg_id_mappings = await self.canonical_reg_id_store.get_canonical_ids(
pushkeys
)

reg_id_mappings = {
reg_id: canonical_reg_id or reg_id
for (reg_id, canonical_reg_id) in reg_id_mappings.items()
}

inverse_reg_id_mappings = {v: k for (k, v) in reg_id_mappings.items()}

data = GcmPushkin._build_data(n, device)
headers = {
b"User-Agent": ["sygnal"],
b"Content-Type": ["application/json"],
b"Authorization": ["key=%s" % (self.api_key,)],
}

# count the number of remapped registration IDs in the request
span_parent.set_tag(
"gcm_num_remapped_reg_ids_used",
[k != v for (k, v) in reg_id_mappings.items()].count(True),
)

# TODO: Implement collapse_key to queue only one message per room.
failed = []

Expand All @@ -365,14 +332,12 @@ async def _dispatch_notification_unlimited(self, n, device, context):
body["priority"] = "normal" if n.prio == "low" else "high"

for retry_number in range(0, MAX_TRIES):
mapped_pushkeys = [reg_id_mappings[pk] for pk in pushkeys]

if len(pushkeys) == 1:
body["to"] = mapped_pushkeys[0]
body["to"] = pushkeys[0]
else:
body["registration_ids"] = mapped_pushkeys
body["registration_ids"] = pushkeys

log.info("Sending (attempt %i) => %r", retry_number, mapped_pushkeys)
log.info("Sending (attempt %i) => %r", retry_number, pushkeys)

try:
span_tags = {"retry_num": retry_number}
Expand All @@ -381,13 +346,11 @@ async def _dispatch_notification_unlimited(self, n, device, context):
"gcm_dispatch_try", tags=span_tags, child_of=span_parent
) as span:
new_failed, new_pushkeys = await self._request_dispatch(
n, log, body, headers, mapped_pushkeys, span
n, log, body, headers, pushkeys, span
)
pushkeys = new_pushkeys
failed += [
inverse_reg_id_mappings[canonical_pk]
for canonical_pk in new_failed
]
failed += new_failed

if len(pushkeys) == 0:
break
except TemporaryNotificationDispatchException as exc:
Expand Down Expand Up @@ -459,79 +422,3 @@ def _build_data(n, device):
data["missed_calls"] = n.counts.missed_calls

return data


class CanonicalRegIdStore(object):
TABLE_CREATE_QUERY = """
CREATE TABLE IF NOT EXISTS gcm_canonical_reg_id (
reg_id TEXT PRIMARY KEY,
canonical_reg_id TEXT NOT NULL
);
"""

def __init__(self, db: ConnectionPool, engine: str):
"""
Args:
db (adbapi.ConnectionPool): database to prepare
engine (str):
Database engine to use. Shoud be either "sqlite" or "postgresql".
"""
self.db = db
self.engine = engine

async def setup(self):
"""
Prepares, if necessary, the database for storing canonical registration IDs.
Separate method from the constructor because we wait for an async request
to complete, so it must be an `async def` method.
"""
await self.db.runOperation(self.TABLE_CREATE_QUERY)

async def set_canonical_id(self, reg_id, canonical_reg_id):
"""
Associates a GCM registration ID with a canonical registration ID.
Args:
reg_id (str): a registration ID
canonical_reg_id (str): the canonical registration ID for `reg_id`
"""
if self.engine == "sqlite":
await self.db.runOperation(
"INSERT OR REPLACE INTO gcm_canonical_reg_id VALUES (?, ?);",
(reg_id, canonical_reg_id),
)
else:
await self.db.runOperation(
"""
INSERT INTO gcm_canonical_reg_id VALUES (%s, %s)
ON CONFLICT (reg_id) DO UPDATE
SET canonical_reg_id = EXCLUDED.canonical_reg_id;
""",
(reg_id, canonical_reg_id),
)

async def get_canonical_ids(self, reg_ids):
"""
Retrieves the canonical registration ID for multiple registration IDs.
Args:
reg_ids (iterable): registration IDs to retrieve canonical registration
IDs for.
Returns (dict):
mapping of registration ID to either its canonical registration ID,
or `None` if there is no entry.
"""
parameter_key = "?" if self.engine == "sqlite" else "%s"
rows = dict(
await self.db.runQuery(
"""
SELECT reg_id, canonical_reg_id
FROM gcm_canonical_reg_id
WHERE reg_id IN (%s)
"""
% (",".join(parameter_key for _ in reg_ids)),
reg_ids,
)
)
return {reg_id: dict(rows).get(reg_id) for reg_id in reg_ids}
Loading

0 comments on commit d52568e

Please sign in to comment.