Skip to content

Commit

Permalink
feat(modules): Mailpit Container
Browse files Browse the repository at this point in the history
  • Loading branch information
oliverlambson committed Jun 30, 2024
1 parent 01d6c18 commit cfce31d
Show file tree
Hide file tree
Showing 5 changed files with 1,381 additions and 1,017 deletions.
2 changes: 2 additions & 0 deletions modules/mailpit/testcontainers/mailpit/README.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
.. autoclass:: testcontainers.mailpit.MailpitContainer
.. title:: testcontainers.mailpit.MailpitContainer
190 changes: 190 additions & 0 deletions modules/mailpit/testcontainers/mailpit/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import os
import tempfile
from datetime import UTC, datetime, timedelta
from typing import NamedTuple, Self

from cryptography import x509
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.serialization import (
NoEncryption,
)
from cryptography.x509.oid import NameOID

from testcontainers.core.container import DockerContainer
from testcontainers.core.waiting_utils import wait_for_logs

logger = logging.getLogger(__name__)


class MailpitUser(NamedTuple):
username: str
password: str


class MailpitContainer(DockerContainer): # type: ignore[misc]
"""
Test container for Mailpit. The example below spins up a Mailpit server
Default configuration supports SMTP with STARTTLS and allows login with any
user/password.
Options:
- require_tls = True forces the use of SSL
- users = [MailpitUser("jane", "secret"), MailpitUser("ron", "pass2")] only
allows login with jane:secret or ron:pass2
Example:
.. doctest::
>>> import smtplib
>>> from testcontainers.mailpit import MailpitContainer
>>> with MailpitContainer() as mailpit_container:
... host_ip = mailpit_container.get_container_host_ip()
... host_port = mailpit_container.get_exposed_smtp_port()
... server = smtplib.SMTP(
... mailpit_container.get_container_host_ip(),
... mailpit_container.get_exposed_smtp_port(),
... )
... code, _ = server.login("any", "auth")
... assert code == 235 # authentication successful
... # use server.sendmail(...) to send emails
"""

def __init__( # type: ignore[no-untyped-def]
self,
image: str = "axllent/mailpit",
*,
smtp_port: int = 1025,
ui_port: int = 8025,
users: list[MailpitUser] | None = None,
require_tls: bool = False,
**kwargs,
) -> None:
super().__init__(image=image, **kwargs)
self.smtp_port = smtp_port
self.ui_port = ui_port

self.users = users if users is not None else []
self.auth_accept_any = int(len(self.users) == 0)

self.require_tls = int(require_tls)
self.tls_key, self.tls_cert = _generate_tls_certificates()
with tempfile.NamedTemporaryFile(delete=False, delete_on_close=False) as tls_key_file:
tls_key_file.write(self.tls_key)
self.tls_key_file = tls_key_file.name

with tempfile.NamedTemporaryFile(delete=False, delete_on_close=False) as tls_cert_file:
tls_cert_file.write(self.tls_cert)
self.tls_cert_file = tls_cert_file.name

@property
def _users_conf(self) -> str:
"""Mailpit user configuration string
"user:password user2:pass2 ...]
"""
return " ".join(f"{user.username}:{user.password}" for user in self.users)

def _configure(self) -> None:
if self.users:
self.with_env("MP_SMTP_AUTH", self._users_conf)
self.with_env("MP_SMTP_AUTH_ACCEPT_ANY", str(self.auth_accept_any))

self.with_env("MP_SMTP_REQUIRE_TLS", str(self.require_tls))

self.with_volume_mapping(self.tls_cert_file, "/cert.pem")
self.with_volume_mapping(self.tls_key_file, "/key.pem")
self.with_env("MP_SMTP_TLS_CERT", "/cert.pem")
self.with_env("MP_SMTP_TLS_KEY", "/key.pem")

self.with_exposed_ports(self.smtp_port, self.ui_port)

def start(self) -> Self:
super().start()
wait_for_logs(self, ".*accessible via.*")
return self

def stop(self, *args, **kwargs) -> None:
super().stop(*args, **kwargs)
os.remove(self.tls_key_file)
os.remove(self.tls_cert_file)

def get_exposed_smtp_port(self) -> int:
return int(self.get_exposed_port(self.smtp_port))


class _TLSCertificates(NamedTuple):
private_key: bytes
certificate: bytes


def _generate_tls_certificates() -> _TLSCertificates:
"""Generate self-signed TLS certificates as bytes"""
private_key = _generate_private_key()
certificate = _generate_self_signed_certificate(private_key)

private_key_bytes = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=NoEncryption(),
)
certificate_bytes = certificate.public_bytes(serialization.Encoding.PEM)

return _TLSCertificates(private_key_bytes, certificate_bytes)


def _generate_private_key() -> rsa.RSAPrivateKey:
"""Generate RSA private key"""
return rsa.generate_private_key(
public_exponent=65537,
key_size=4096,
)


def _generate_self_signed_certificate(
private_key: rsa.RSAPrivateKey,
) -> x509.Certificate:
"""Generate self-signed certificate with RSA private key"""
domain = "mydomain.com"
subject = issuer = x509.Name(
[
x509.NameAttribute(NameOID.COUNTRY_NAME, "US"),
x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "California"),
x509.NameAttribute(NameOID.LOCALITY_NAME, "San Francisco"),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "The Post Office"),
x509.NameAttribute(NameOID.COMMON_NAME, domain),
]
)

return (
x509.CertificateBuilder()
.subject_name(subject)
.issuer_name(issuer)
.public_key(private_key.public_key())
.serial_number(x509.random_serial_number())
.not_valid_before(datetime.now(UTC))
.not_valid_after(datetime.now(UTC) + timedelta(days=3650)) # 10 years
.add_extension(
x509.SubjectAlternativeName([x509.DNSName(domain)]),
critical=False,
)
.sign(private_key, hashes.SHA256())
)
124 changes: 124 additions & 0 deletions modules/mailpit/testcontainers/mailpit/tests/test_mailpit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart

import pytest

from testcontainers.mailpit import MailpitContainer, MailpitUser

_sender = "[email protected]"
_receivers = ["[email protected]"]
_msg = MIMEMultipart("mixed")
_msg["From"] = _sender
_msg["To"] = ", ".join(_receivers)
_msg["Subject"] = "test"
_msg.attach(MIMEText("test", "plain"))
_sendmail_args = (_sender, _receivers, _msg.as_string())


def test_mailpit_basic():
config = MailpitContainer()
with config as mailpit:
server = smtplib.SMTP(
mailpit.get_container_host_ip(),
mailpit.get_exposed_smtp_port(),
)
server.login("any", "auth")
server.sendmail(*_sendmail_args)


def test_mailpit_starttls():
config = MailpitContainer()
with config as mailpit:
server = smtplib.SMTP(
mailpit.get_container_host_ip(),
mailpit.get_exposed_smtp_port(),
)
server.starttls()
server.login("any", "auth")
server.sendmail(*_sendmail_args)


def test_mailpit_force_tls():
config = MailpitContainer(require_tls=True)
with config as mailpit:
server = smtplib.SMTP_SSL(
mailpit.get_container_host_ip(),
mailpit.get_exposed_smtp_port(),
)
server.login("any", "auth")
server.sendmail(*_sendmail_args)


def test_mailpit_basic_with_users_pass_auth():
users = [MailpitUser("user", "password")]
config = MailpitContainer(users=users)
with config as mailpit:
server = smtplib.SMTP(
mailpit.get_container_host_ip(),
mailpit.get_exposed_smtp_port(),
)
server.login(mailpit.users[0].username, mailpit.users[0].password)
server.sendmail(*_sendmail_args)


def test_mailpit_basic_with_users_fail_auth():
users = [MailpitUser("user", "password")]
config = MailpitContainer(users=users)
with pytest.raises(smtplib.SMTPAuthenticationError):
with config as mailpit:
server = smtplib.SMTP(
mailpit.get_container_host_ip(),
mailpit.get_exposed_smtp_port(),
)
server.login("not", "good")


def test_mailpit_starttls_with_users_pass_auth():
users = [MailpitUser("user", "password")]
config = MailpitContainer(users=users)
with config as mailpit:
server = smtplib.SMTP(
mailpit.get_container_host_ip(),
mailpit.get_exposed_smtp_port(),
)
server.starttls()
server.login(mailpit.users[0].username, mailpit.users[0].password)
server.sendmail(*_sendmail_args)


def test_mailpit_starttls_with_users_fail_auth():
users = [MailpitUser("user", "password")]
config = MailpitContainer(users=users)
with pytest.raises(smtplib.SMTPAuthenticationError):
with config as mailpit:
server = smtplib.SMTP(
mailpit.get_container_host_ip(),
mailpit.get_exposed_smtp_port(),
)
server.starttls()
server.login("not", "good")


def test_mailpit_force_tls_with_users_pass_auth():
users = [MailpitUser("user", "password")]
config = MailpitContainer(users=users, require_tls=True)
with config as mailpit:
server = smtplib.SMTP_SSL(
mailpit.get_container_host_ip(),
mailpit.get_exposed_smtp_port(),
)
server.login(mailpit.users[0].username, mailpit.users[0].password)
server.sendmail(*_sendmail_args)


def test_mailpit_force_tls_with_users_fail_auth():
users = [MailpitUser("user", "password")]
config = MailpitContainer(users=users, require_tls=True)
with pytest.raises(smtplib.SMTPAuthenticationError):
with config as mailpit:
server = smtplib.SMTP_SSL(
mailpit.get_container_host_ip(),
mailpit.get_exposed_smtp_port(),
)
server.login("not", "good")
Loading

0 comments on commit cfce31d

Please sign in to comment.