Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add database config class #6513

Merged
merged 7 commits into from
Dec 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/6513.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Remove all assumptions of there being a single phyiscal DB apart from the `synapse.config`.
9 changes: 2 additions & 7 deletions scripts-dev/update_database
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ from synapse.config.homeserver import HomeServerConfig
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.server import HomeServer
from synapse.storage import DataStore
from synapse.storage.prepare_database import prepare_database

logger = logging.getLogger("update_database")

Expand Down Expand Up @@ -77,12 +76,8 @@ if __name__ == "__main__":
# Instantiate and initialise the homeserver object.
hs = MockHomeserver(config)

db_conn = hs.get_db_conn()
# Update the database to the latest schema.
prepare_database(db_conn, hs.database_engine, config=config)
db_conn.commit()

# setup instantiates the store within the homeserver object.
# Setup instantiates the store within the homeserver object and updates the
# DB.
hs.setup()
store = hs.get_datastore()

Expand Down
58 changes: 24 additions & 34 deletions scripts/synapse_port_db
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import yaml
from twisted.enterprise import adbapi
from twisted.internet import defer, reactor

from synapse.config.database import DatabaseConnectionConfig
from synapse.config.homeserver import HomeServerConfig
from synapse.logging.context import PreserveLoggingContext
from synapse.storage._base import LoggingTransaction
Expand All @@ -55,7 +56,7 @@ from synapse.storage.data_stores.main.stats import StatsStore
from synapse.storage.data_stores.main.user_directory import (
UserDirectoryBackgroundUpdateStore,
)
from synapse.storage.database import Database
from synapse.storage.database import Database, make_conn
from synapse.storage.engines import create_engine
from synapse.storage.prepare_database import prepare_database
from synapse.util import Clock
Expand Down Expand Up @@ -165,23 +166,17 @@ class Store(


class MockHomeserver:
def __init__(self, config, database_engine, db_conn, db_pool):
self.database_engine = database_engine
self.db_conn = db_conn
self.db_pool = db_pool
def __init__(self, config):
self.clock = Clock(reactor)
self.config = config
self.hostname = config.server_name

def get_db_conn(self):
return self.db_conn

def get_db_pool(self):
return self.db_pool

def get_clock(self):
return self.clock

def get_reactor(self):
return reactor


class Porter(object):
def __init__(self, **kwargs):
Expand Down Expand Up @@ -445,45 +440,36 @@ class Porter(object):
else:
return

def setup_db(self, db_config, database_engine):
db_conn = database_engine.module.connect(
**{
k: v
for k, v in db_config.get("args", {}).items()
if not k.startswith("cp_")
}
)

prepare_database(db_conn, database_engine, config=None)
def setup_db(self, db_config: DatabaseConnectionConfig, engine):
db_conn = make_conn(db_config, engine)
prepare_database(db_conn, engine, config=None)

db_conn.commit()

return db_conn

@defer.inlineCallbacks
def build_db_store(self, config):
def build_db_store(self, db_config: DatabaseConnectionConfig):
"""Builds and returns a database store using the provided configuration.

Args:
config: The database configuration, i.e. a dict following the structure of
the "database" section of Synapse's configuration file.
config: The database configuration

Returns:
The built Store object.
"""
engine = create_engine(config)

self.progress.set_state("Preparing %s" % config["name"])
conn = self.setup_db(config, engine)
self.progress.set_state("Preparing %s" % db_config.config["name"])

db_pool = adbapi.ConnectionPool(config["name"], **config["args"])
engine = create_engine(db_config.config)
conn = self.setup_db(db_config, engine)

hs = MockHomeserver(self.hs_config, engine, conn, db_pool)
hs = MockHomeserver(self.hs_config)

store = Store(Database(hs), conn, hs)
store = Store(Database(hs, db_config, engine), conn, hs)

yield store.db.runInteraction(
"%s_engine.check_database" % config["name"], engine.check_database,
"%s_engine.check_database" % db_config.config["name"],
engine.check_database,
)

return store
Expand All @@ -509,7 +495,11 @@ class Porter(object):
@defer.inlineCallbacks
def run(self):
try:
self.sqlite_store = yield self.build_db_store(self.sqlite_config)
self.sqlite_store = yield self.build_db_store(
DatabaseConnectionConfig(
"master", self.sqlite_config, data_stores=["main"]
)
)

# Check if all background updates are done, abort if not.
updates_complete = (
Expand All @@ -524,7 +514,7 @@ class Porter(object):
defer.returnValue(None)

self.postgres_store = yield self.build_db_store(
self.hs_config.database_config
self.hs_config.get_single_database()
)

yield self.run_background_updates_on_postgres()
Expand Down
78 changes: 62 additions & 16 deletions synapse/config/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,43 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
from textwrap import indent
from typing import List

import yaml

from ._base import Config
from synapse.config._base import Config, ConfigError

logger = logging.getLogger(__name__)


class DatabaseConnectionConfig:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this might make more sense as an attr.s ? Just a thought

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I thought about that, but then I find the validation stuff non intuitive.

"""Contains the connection config for a particular database.

Args:
name: A label for the database, used for logging.
db_config: The config for a particular database, as per `database`
section of main config. Has two fields: `name` for database
module name, and `args` for the args to give to the database
connector.
data_stores: The list of data stores that should be provisioned on the
database.
"""

def __init__(self, name: str, db_config: dict, data_stores: List[str]):
if db_config["name"] not in ("sqlite3", "psycopg2"):
raise ConfigError("Unsupported database type %r" % (db_config["name"],))

if db_config["name"] == "sqlite3":
db_config.setdefault("args", {}).update(
{"cp_min": 1, "cp_max": 1, "check_same_thread": False}
)

self.name = name
self.config = db_config
self.data_stores = data_stores


class DatabaseConfig(Config):
Expand All @@ -26,20 +57,14 @@ class DatabaseConfig(Config):
def read_config(self, config, **kwargs):
self.event_cache_size = self.parse_size(config.get("event_cache_size", "10K"))

self.database_config = config.get("database")
database_config = config.get("database")

if self.database_config is None:
self.database_config = {"name": "sqlite3", "args": {}}
if database_config is None:
database_config = {"name": "sqlite3", "args": {}}

name = self.database_config.get("name", None)
if name == "psycopg2":
pass
elif name == "sqlite3":
self.database_config.setdefault("args", {}).update(
{"cp_min": 1, "cp_max": 1, "check_same_thread": False}
)
else:
raise RuntimeError("Unsupported database type '%s'" % (name,))
self.databases = [
DatabaseConnectionConfig("master", database_config, data_stores=["main"])
richvdh marked this conversation as resolved.
Show resolved Hide resolved
]

self.set_databasepath(config.get("database_path"))

Expand Down Expand Up @@ -76,11 +101,24 @@ def read_arguments(self, args):
self.set_databasepath(args.database_path)

def set_databasepath(self, database_path):
richvdh marked this conversation as resolved.
Show resolved Hide resolved
if database_path is None:
return

if database_path != ":memory:":
database_path = self.abspath(database_path)
if self.database_config.get("name", None) == "sqlite3":
if database_path is not None:
self.database_config["args"]["database"] = database_path

# We only support setting a database path if we have a single sqlite3
# database.
if len(self.databases) != 1:
raise ConfigError("Cannot specify 'database_path' with multiple databases")

database = self.get_single_database()
if database.config["name"] != "sqlite3":
# We don't raise here as we haven't done so before for this case.
logger.warn("Ignoring 'database_path' for non-sqlite3 database")
return

database.config["args"]["database"] = database_path

@staticmethod
def add_arguments(parser):
Expand All @@ -91,3 +129,11 @@ def add_arguments(parser):
metavar="SQLITE_DATABASE_PATH",
help="The path to a sqlite database to use.",
)

def get_single_database(self) -> DatabaseConnectionConfig:
"""Returns the database if there is only one, useful for e.g. tests
"""
if len(self.databases) != 1:
raise Exception("More than one database exists")

return self.databases[0]
2 changes: 1 addition & 1 deletion synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ def _on_shutdown(self):
is some spurious presence changes that will self-correct.
"""
# If the DB pool has already terminated, don't try updating
if not self.hs.get_db_pool().running:
if not self.store.database.is_running():
return

logger.info(
Expand Down
41 changes: 4 additions & 37 deletions synapse/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import logging
import os

from twisted.enterprise import adbapi
from twisted.mail.smtp import sendmail
from twisted.web.client import BrowserLikePolicyForHTTPS

Expand Down Expand Up @@ -98,7 +97,6 @@
)
from synapse.state import StateHandler, StateResolutionHandler
from synapse.storage import DataStores, Storage
from synapse.storage.engines import create_engine
from synapse.streams.events import EventSources
from synapse.util import Clock
from synapse.util.distributor import Distributor
Expand Down Expand Up @@ -134,7 +132,6 @@ def build_DEPENDENCY(self)

DEPENDENCIES = [
"http_client",
"db_pool",
"federation_client",
"federation_server",
"handlers",
Expand Down Expand Up @@ -233,12 +230,6 @@ def __init__(self, hostname: str, config: HomeServerConfig, reactor=None, **kwar
self.admin_redaction_ratelimiter = Ratelimiter()
self.registration_ratelimiter = Ratelimiter()

self.database_engine = create_engine(config.database_config)
config.database_config.setdefault("args", {})[
"cp_openfun"
] = self.database_engine.on_new_connection
self.db_config = config.database_config

self.datastores = None

# Other kwargs are explicit dependencies
Expand All @@ -247,10 +238,8 @@ def __init__(self, hostname: str, config: HomeServerConfig, reactor=None, **kwar

def setup(self):
logger.info("Setting up.")
with self.get_db_conn() as conn:
self.datastores = DataStores(self.DATASTORE_CLASS, conn, self)
conn.commit()
self.start_time = int(self.get_clock().time())
self.datastores = DataStores(self.DATASTORE_CLASS, self)
logger.info("Finished setting up.")

def setup_master(self):
Expand Down Expand Up @@ -284,6 +273,9 @@ def get_clock(self):
def get_datastore(self):
return self.datastores.main

def get_datastores(self):
return self.datastores

def get_config(self):
return self.config

Expand Down Expand Up @@ -433,31 +425,6 @@ def build_http_client(self):
)
return MatrixFederationHttpClient(self, tls_client_options_factory)

def build_db_pool(self):
name = self.db_config["name"]

return adbapi.ConnectionPool(
name, cp_reactor=self.get_reactor(), **self.db_config.get("args", {})
)

def get_db_conn(self, run_new_connection=True):
"""Makes a new connection to the database, skipping the db pool

Returns:
Connection: a connection object implementing the PEP-249 spec
"""
# Any param beginning with cp_ is a parameter for adbapi, and should
# not be passed to the database engine.
db_params = {
k: v
for k, v in self.db_config.get("args", {}).items()
if not k.startswith("cp_")
}
db_conn = self.database_engine.module.connect(**db_params)
if run_new_connection:
self.database_engine.on_new_connection(db_conn)
return db_conn

def build_media_repository_resource(self):
# build the media repo resource. This indirects through the HomeServer
# to ensure that we only have a single instance of
Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class SQLBaseStore(object):
def __init__(self, database: Database, db_conn, hs):
self.hs = hs
self._clock = hs.get_clock()
self.database_engine = hs.database_engine
self.database_engine = database.engine
self.db = database
self.rand = random.SystemRandom()

Expand Down
Loading