Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Re-introduce the outbound federation proxy #15913

Merged
merged 13 commits into from
Jul 18, 2023
Merged
1 change: 1 addition & 0 deletions changelog.d/15913.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Allow configuring the set of workers to proxy outbound federation traffic through via `outbound_federation_restricted_to`.
33 changes: 26 additions & 7 deletions docs/usage/configuration/config_documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -3960,13 +3960,14 @@ federation_sender_instances:
---
### `instance_map`

When using workers this should be a map from [`worker_name`](#worker_name) to the
HTTP replication listener of the worker, if configured, and to the main process.
Each worker declared under [`stream_writers`](../../workers.md#stream-writers) needs
a HTTP replication listener, and that listener should be included in the `instance_map`.
The main process also needs an entry on the `instance_map`, and it should be listed under
`main` **if even one other worker exists**. Ensure the port matches with what is declared
inside the `listener` block for a `replication` listener.
When using workers this should be a map from [`worker_name`](#worker_name) to the HTTP
replication listener of the worker, if configured, and to the main process. Each worker
declared under [`stream_writers`](../../workers.md#stream-writers) and
[`outbound_federation_restricted_to`](#outbound_federation_restricted_to) needs a HTTP
replication listener, and that listener should be included in the `instance_map`. The
main process also needs an entry on the `instance_map`, and it should be listed under
`main` **if even one other worker exists**. Ensure the port matches with what is
declared inside the `listener` block for a `replication` listener.


Example configuration:
Expand Down Expand Up @@ -4004,6 +4005,24 @@ stream_writers:
typing: worker1
```
---
### `outbound_federation_restricted_to`

When using workers, you can restrict outbound federation traffic to only go through a
specific subset of workers. Any worker specified here must also be in the
[`instance_map`](#instance_map).
[`worker_replication_secret`](#worker_replication_secret) must also be configured to
authorize inter-worker communication.

```yaml
outbound_federation_restricted_to:
- federation_sender1
- federation_sender2
```

Also see the [worker
documentation](../../workers.md#restrict-outbound-federation-traffic-to-a-specific-set-of-workers)
for more info.
---
### `run_background_tasks_on`

The [worker](../../workers.md#background-tasks) that is used to run
Expand Down
24 changes: 24 additions & 0 deletions docs/workers.md
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,30 @@ the stream writer for the `presence` stream:

^/_matrix/client/(api/v1|r0|v3|unstable)/presence/

#### Restrict outbound federation traffic to a specific set of workers

The
[`outbound_federation_restricted_to`](usage/configuration/config_documentation.md#outbound_federation_restricted_to)
configuration is useful to make sure outbound federation traffic only goes through a
specified subset of workers. This allows you to set more strict access controls (like a
firewall) for all workers and only allow the `federation_sender`'s to contact the
outside world.

```yaml
instance_map:
main:
host: localhost
port: 8030
federation_sender1:
host: localhost
port: 8034

outbound_federation_restricted_to:
- federation_sender1

worker_replication_secret: "secret_secret"
```

#### Background tasks

There is also support for moving background tasks to a separate
Expand Down
7 changes: 7 additions & 0 deletions synapse/api/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,13 @@ def __init__(self, msg: str):
super().__init__(HTTPStatus.BAD_REQUEST, msg, Codes.BAD_JSON)


class InvalidProxyCredentialsError(SynapseError):
"""Error raised when the proxy credentials are invalid."""

def __init__(self, msg: str, errcode: str = Codes.UNKNOWN):
super().__init__(401, msg, errcode)


class ProxiedRequestError(SynapseError):
"""An error from a general matrix endpoint, eg. from a proxied Matrix API call.

Expand Down
2 changes: 2 additions & 0 deletions synapse/app/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ def listen_unix(


def listen_http(
hs: "HomeServer",
listener_config: ListenerConfig,
root_resource: Resource,
version_string: str,
Expand All @@ -406,6 +407,7 @@ def listen_http(
version_string,
max_request_body_size=max_request_body_size,
reactor=reactor,
hs=hs,
)

if isinstance(listener_config, TCPListenerConfig):
Expand Down
1 change: 1 addition & 0 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ def _listen_http(self, listener_config: ListenerConfig) -> None:
root_resource = create_resource_tree(resources, OptionsResource())

_base.listen_http(
self,
listener_config,
root_resource,
self.version_string,
Expand Down
1 change: 1 addition & 0 deletions synapse/app/homeserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ def _listener_http(
root_resource = OptionsResource()

ports = listen_http(
self,
listener_config,
create_resource_tree(resources, root_resource),
self.version_string,
Expand Down
45 changes: 44 additions & 1 deletion synapse/config/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import argparse
import logging
from typing import Any, Dict, List, Union
from typing import Any, Dict, List, Optional, Union

import attr
from pydantic import BaseModel, Extra, StrictBool, StrictInt, StrictStr
Expand Down Expand Up @@ -171,6 +171,27 @@ class WriterLocations:
)


@attr.s(auto_attribs=True)
class OutboundFederationRestrictedTo:
"""Whether we limit outbound federation to a certain set of instances.

Attributes:
instances: optional list of instances that can make outbound federation
requests. If None then all instances can make federation requests.
locations: list of instance locations to connect to proxy via.
"""

instances: Optional[List[str]]
locations: List[InstanceLocationConfig] = attr.Factory(list)

def __contains__(self, instance: str) -> bool:
# It feels a bit dirty to return `True` if `instances` is `None`, but it makes
# sense in downstream usage in the sense that if
# `outbound_federation_restricted_to` is not configured, then any instance can
# talk to federation (no restrictions so always return `True`).
return self.instances is None or instance in self.instances


class WorkerConfig(Config):
"""The workers are processes run separately to the main synapse process.
They have their own pid_file and listener configuration. They use the
Expand Down Expand Up @@ -385,6 +406,28 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None:
new_option_name="update_user_directory_from_worker",
)

outbound_federation_restricted_to = config.get(
"outbound_federation_restricted_to", None
)
self.outbound_federation_restricted_to = OutboundFederationRestrictedTo(
outbound_federation_restricted_to
)
if outbound_federation_restricted_to:
if not self.worker_replication_secret:
raise ConfigError(
"`worker_replication_secret` must be configured when using `outbound_federation_restricted_to`."
)
Comment on lines +416 to +419
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we okay with re-using worker_replication_secret for use with Proxy-Authorization between workers? It's still inter-worker communication but it's not replication traffic.


for instance in outbound_federation_restricted_to:
if instance not in self.instance_map:
raise ConfigError(
"Instance %r is configured in 'outbound_federation_restricted_to' but does not appear in `instance_map` config."
% (instance,)
)
self.outbound_federation_restricted_to.locations.append(
self.instance_map[instance]
)

def _should_this_worker_perform_duty(
self,
config: Dict[str, Any],
Expand Down
7 changes: 6 additions & 1 deletion synapse/http/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1037,7 +1037,12 @@ def connectionLost(self, reason: Failure = connectionDone) -> None:
if reason.check(ResponseDone):
self.deferred.callback(self.length)
elif reason.check(PotentialDataLoss):
# stolen from https://github.com/twisted/treq/pull/49/files
# This applies to requests which don't set `Content-Length` or a
# `Transfer-Encoding` in the response because in this case the end of the
# response is indicated by the connection being closed, an event which may
# also be due to a transient network problem or other error. But since this
# behavior is expected of some servers (like YouTube), let's ignore it.
# Stolen from https://github.com/twisted/treq/pull/49/files
# http://twistedmatrix.com/trac/ticket/4840
self.deferred.callback(self.length)
else:
Expand Down
20 changes: 19 additions & 1 deletion synapse/http/connectproxyclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import abc
import base64
import logging
from typing import Optional, Union
Expand Down Expand Up @@ -39,8 +40,14 @@ class ProxyConnectError(ConnectError):
pass


@attr.s(auto_attribs=True)
class ProxyCredentials:
@abc.abstractmethod
def as_proxy_authorization_value(self) -> bytes:
raise NotImplementedError()


@attr.s(auto_attribs=True)
class BasicProxyCredentials(ProxyCredentials):
username_password: bytes

def as_proxy_authorization_value(self) -> bytes:
Expand All @@ -55,6 +62,17 @@ def as_proxy_authorization_value(self) -> bytes:
return b"Basic " + base64.encodebytes(self.username_password)


@attr.s(auto_attribs=True)
class BearerProxyCredentials(ProxyCredentials):
access_token: bytes

def as_proxy_authorization_value(self) -> bytes:
"""
Return the value for a Proxy-Authorization header (i.e. 'Bearer xxx').
"""
return b"Bearer " + self.access_token


@implementer(IStreamClientEndpoint)
class HTTPConnectProxyEndpoint:
"""An Endpoint implementation which will send a CONNECT request to an http proxy
Expand Down
Loading