Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added time encoder #912

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions beanie/odm/utils/encoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,65 @@
from beanie.odm.fields import Link, LinkTypes
from beanie.odm.utils.pydantic import IS_PYDANTIC_V2, get_model_fields

UNIX_TIME_START = datetime.datetime(1970, 1, 1, 0, 0, 0, 0)


def _compatct_time_encoder(time: datetime.time) -> str:
# re-implementation of speed date in python
# https://github.com/pydantic/speedate/blob/aad1d9117a05618ae883e20ae179c582eb998eeb /src/time.rs#L41 and
# pydantic https://github.com/pydantic/pydantic-core/blob/f636403c2e8169bcbb94aa71a0727d76076f7e6e/src/input
# /datetime.rs#L177
if time.microsecond != 0:
time_string = "{:02d}:{:02d}:{:02d}.{:06d}".format(
time.hour, time.minute, time.second, time.microsecond
)
elif time.second != 0:
time_string = "{:02d}:{:02d}:{:02d}".format(
time.hour, time.minute, time.second
)
else:
time_string = "{:02d}:{:02d}".format(time.hour, time.minute)

tz_offset = None
if time.tzinfo is not None: # has timezone
# check if the offset is fixed no matter the datetime
offset_delta = time.tzinfo.utcoffset(None)
if (
offset_delta is None
): # is ambiguous resolve base on 1.1.1970 to be consistent across time
try:
offset_delta = time.tzinfo.utcoffset(UNIX_TIME_START)
except Exception as e:
raise ValueError from e
if offset_delta is not None: # is not ambiguous
tz_offset = round(offset_delta.total_seconds())
else: # even if stuffing time into may work is wrong see
raise ValueError(
"timezone is not fixed and cannot be resolved with datetime,"
f"contact the maintainer of {type(time.tzinfo).__name__} for support"
)
else: # is known without datetime
tz_offset = round(offset_delta.total_seconds())

if tz_offset is not None:
if tz_offset == 0:
time_string += "Z"
else:
is_negative = tz_offset < 0
hours = abs(tz_offset // 3600)
minutes = abs(tz_offset % 3600) // 60

# since minutes are negative we need to subtract them from 60 and roll one hour back to be correct
if is_negative and minutes != 0:
hours -= 1
minutes = 60 - minutes

sign = "-" if is_negative else "+"
time_string += "{}{:02d}:{:02d}".format(sign, hours, minutes)

return time_string


SingleArgCallable = Callable[[Any], Any]
DEFAULT_CUSTOM_ENCODERS: MutableMapping[type, SingleArgCallable] = {
ipaddress.IPv4Address: str,
Expand All @@ -37,8 +96,10 @@
pathlib.PurePath: str,
pydantic.SecretBytes: pydantic.SecretBytes.get_secret_value,
pydantic.SecretStr: pydantic.SecretStr.get_secret_value,
# datetimes
datetime.date: lambda d: datetime.datetime.combine(d, datetime.time.min),
datetime.timedelta: operator.methodcaller("total_seconds"),
datetime.time: _compatct_time_encoder,
enum.Enum: operator.attrgetter("value"),
Link: operator.attrgetter("ref"),
bytes: bson.Binary,
Expand Down
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ test = [
"pydantic-settings>=2",
"pydantic-extra-types>=2",
"pydantic[email]",
"pytz",
"tzdata"
]
doc = [
"Pygments>=2.8.0",
Expand Down
2 changes: 2 additions & 0 deletions tests/odm/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
DocNonRoot,
DocumentForEncodingTest,
DocumentForEncodingTestDate,
DocumentForEncodingTestTime,
DocumentMultiModelOne,
DocumentMultiModelTwo,
DocumentTestModel,
Expand Down Expand Up @@ -291,6 +292,7 @@ async def init(db):
DocumentWithLinkForNesting,
DocumentWithBackLinkForNesting,
LongSelfLink,
DocumentForEncodingTestTime,
]
await init_beanie(
database=db,
Expand Down
4 changes: 4 additions & 0 deletions tests/odm/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1107,3 +1107,7 @@ class LongSelfLink(Document):

class Settings:
max_nesting_depth = 50


class DocumentForEncodingTestTime(Document):
time_field: datetime.time
113 changes: 112 additions & 1 deletion tests/odm/test_encoder.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
import re
from datetime import date, datetime
from datetime import date, datetime, time
from datetime import timezone as dt_timezone
from typing import Union
from uuid import uuid4

import pytest
from bson import Binary, Regex
from pydantic import AnyUrl

# support for older python versions (will cause to test some things twice)
from pytz import UTC, all_timezones, timezone

from beanie.odm.utils.encoder import Encoder
from beanie.odm.utils.pydantic import IS_PYDANTIC_V2
from tests.odm.models import (
Child,
DocumentForEncodingTest,
DocumentForEncodingTestDate,
DocumentForEncodingTestTime,
DocumentWithComplexDictKey,
DocumentWithDecimalField,
DocumentWithHttpUrlField,
Expand All @@ -21,6 +27,13 @@
SampleWithMutableObjects,
)

has_zone_info = True
try:
from zoneinfo import ZoneInfo, available_timezones
except ImportError:
has_zone_info = False
ZoneInfo = timezone


async def test_encode_datetime():
assert isinstance(Encoder().encode(datetime.now()), datetime)
Expand Down Expand Up @@ -169,3 +182,101 @@ async def test_dict_with_complex_key():

assert isinstance(new_doc.dict_field, dict)
assert new_doc.dict_field.get(uuid) == dt


def is_same_type_or_subtype(obj1, obj2):
return isinstance(obj1, type(obj2)) or isinstance(obj2, type(obj1))


def assert_time_equal(to_test: time, reference: time):
if to_test.tzinfo is not None: # tz
if reference.tzinfo.utcoffset(None) is None:
now = datetime(
1970, 1, 1, 0, 0, 0, 0
) # date dependsent because of daylight saving time

check_tz_info(to_test, reference, now)
# compare without info
assert to_test.replace(tzinfo=None) == reference.replace(
tzinfo=None
)
else:
check_tz_info(to_test, reference)
assert to_test.replace(tzinfo=None) == reference.replace(
tzinfo=None
)
else:
assert to_test == reference


def check_tz_info(
to_test: time, reference: time, when: Union[datetime, None] = None
):
# up to 1-minute (not included) difference is allowed by serialization standard used by pydantic
assert (
to_test.tzinfo.utcoffset(when).total_seconds() // 60
== reference.tzinfo.utcoffset(when).total_seconds() // 60
)


async def inner_test_time(test_time: time):
doc = DocumentForEncodingTestTime(time_field=test_time)
await doc.insert()
new_doc = await DocumentForEncodingTestTime.get(doc.id)
assert_time_equal(new_doc.time_field, doc.time_field)
assert isinstance(new_doc.time_field, time)
assert_time_equal(new_doc.time_field, test_time)


@pytest.mark.parametrize(
"test_time",
[
time(12),
time(12, fold=1),
time(12, 3),
time(12, 3, fold=1),
time(12, 4, 5),
time(12, 4, 5, fold=1),
time(12, 4, 5, 123456),
time(12, 4, 5, 123456, fold=1),
time(12, 4, 5, 123456, tzinfo=UTC),
time(12, 4, 5, 123456, tzinfo=timezone("Europe/Prague")),
time(12, 4, 5, 123456, tzinfo=UTC, fold=1),
time(12, 4, 5, 123456, tzinfo=timezone("Europe/Prague")),
time(12, 4, 5, 123456, tzinfo=timezone("Europe/Prague"), fold=1),
time(12, 4, 5, 123456, tzinfo=dt_timezone.utc),
time(12, 4, 5, 123456, tzinfo=dt_timezone.utc, fold=1),
time(12, 4, 5, 123456, tzinfo=ZoneInfo("Europe/Prague")),
time(12, 4, 5, 123456, tzinfo=ZoneInfo("Europe/Prague"), fold=1),
],
)
async def test_encode_time_with_tz(test_time: time):
await inner_test_time(test_time)


if has_zone_info:
tz = list(available_timezones())
07pepa marked this conversation as resolved.
Show resolved Hide resolved
tz.sort()

@pytest.mark.parametrize("tz_string", tz)
async def test_encode_time_exhaustive_timezones_zone_info(tz_string: str):
await inner_test_time(
time(12, 4, 5, 123456, tzinfo=ZoneInfo(tz_string))
)


# folowing causes pytz.exceptions.NonExistentTimeError
pytz_unsupported = (
"America/Bahia_Banderas",
"America/Hermosillo",
"America/Mazatlan",
"Mexico/BajaSur",
)


@pytest.mark.parametrize(
"tz_string",
list(filter(lambda x: x not in pytz_unsupported, all_timezones)),
)
async def test_encode_time_exhaustive_timezones_pytz(tz_string: str):
await inner_test_time(time(12, 4, 5, 123456, tzinfo=timezone(tz_string)))