Skip to content

Commit

Permalink
Avoid pendulum.from_timestamp usage (#37160)
Browse files Browse the repository at this point in the history
(cherry picked from commit 8374105)
  • Loading branch information
Taragolis authored and potiuk committed Feb 7, 2024
1 parent 5fdb578 commit ff4fa4f
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 8 deletions.
6 changes: 5 additions & 1 deletion airflow/providers/elasticsearch/log/es_json_formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
# under the License.
from __future__ import annotations

from datetime import datetime

import pendulum

from airflow.utils.log.json_formatter import JSONFormatter
Expand All @@ -30,7 +32,9 @@ class ElasticsearchJSONFormatter(JSONFormatter):

def formatTime(self, record, datefmt=None):
"""Return the creation time of the LogRecord in ISO 8601 date/time format in the local time zone."""
dt = pendulum.from_timestamp(record.created, tz=pendulum.local_timezone())
# TODO: Use airflow.utils.timezone.from_timestamp(record.created, tz="local")
# as soon as min Airflow 2.9.0
dt = datetime.fromtimestamp(record.created, tz=pendulum.local_timezone())
s = dt.strftime(datefmt or self.default_time_format)
if self.default_msec_format:
s = self.default_msec_format % (s, record.msecs)
Expand Down
7 changes: 3 additions & 4 deletions airflow/serialization/serialized_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@

import attrs
import lazy_object_proxy
import pendulum
from dateutil import relativedelta
from pendulum.tz.timezone import FixedTimezone, Timezone

Expand Down Expand Up @@ -65,7 +64,7 @@
from airflow.utils.module_loading import import_string, qualname
from airflow.utils.operator_resources import Resources
from airflow.utils.task_group import MappedTaskGroup, TaskGroup
from airflow.utils.timezone import parse_timezone
from airflow.utils.timezone import from_timestamp, parse_timezone
from airflow.utils.types import NOTSET, ArgNotSet

if TYPE_CHECKING:
Expand Down Expand Up @@ -563,7 +562,7 @@ def deserialize(cls, encoded_var: Any, use_pydantic_models=False) -> Any:
elif type_ == DAT.OP:
return SerializedBaseOperator.deserialize_operator(var)
elif type_ == DAT.DATETIME:
return pendulum.from_timestamp(var)
return from_timestamp(var)
elif type_ == DAT.POD:
if not _has_kubernetes():
raise RuntimeError("Cannot deserialize POD objects without kubernetes libraries installed!")
Expand Down Expand Up @@ -607,7 +606,7 @@ def deserialize(cls, encoded_var: Any, use_pydantic_models=False) -> Any:
else:
raise TypeError(f"Invalid type {type_!s} in deserialization.")

_deserialize_datetime = pendulum.from_timestamp
_deserialize_datetime = from_timestamp
_deserialize_timezone = parse_timezone

@classmethod
Expand Down
4 changes: 2 additions & 2 deletions airflow/utils/log/timezone_aware.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import logging

import pendulum
from airflow.utils import timezone


class TimezoneAware(logging.Formatter):
Expand All @@ -39,7 +39,7 @@ def formatTime(self, record, datefmt=None):
This returns the creation time of the specified LogRecord in ISO 8601
date and time format in the local time zone.
"""
dt = pendulum.from_timestamp(record.created, tz=pendulum.local_timezone())
dt = timezone.from_timestamp(record.created, tz="local")
s = dt.strftime(datefmt or self.default_time_format)
if self.default_msec_format:
s = self.default_msec_format % (s, record.msecs)
Expand Down
22 changes: 22 additions & 0 deletions airflow/utils/timezone.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
if TYPE_CHECKING:
from pendulum.tz.timezone import FixedTimezone, Timezone

from airflow.typing_compat import Literal

_PENDULUM3 = pendulum.__version__.startswith("3")
# UTC Timezone as a tzinfo instance. Actual value depends on pendulum version:
# - Timezone("UTC") in pendulum 3
Expand Down Expand Up @@ -305,3 +307,23 @@ def local_timezone() -> FixedTimezone | Timezone:
:meta private:
"""
return pendulum.tz.local_timezone()


def from_timestamp(
timestamp: int | float, tz: str | FixedTimezone | Timezone | Literal["local"] = utc
) -> DateTime:
"""
Parse timestamp and return DateTime in a given time zone.
:param timestamp: epoch time in seconds.
:param tz: In which timezone should return a resulting object.
Could be either one of pendulum timezone, IANA timezone or `local` literal.
:meta private:
"""
result = coerce_datetime(dt.datetime.fromtimestamp(timestamp, tz=utc))
if tz != utc or tz != "UTC":
if isinstance(tz, str) and tz.lower() == "local":
tz = local_timezone()
result = result.in_timezone(tz)
return result
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1374,6 +1374,8 @@ combine-as-imports = true
"airflow.AirflowException".msg = "Use airflow.exceptions.AirflowException instead."
"airflow.Dataset".msg = "Use airflow.datasets.Dataset instead."
"airflow.models.baseoperator.BaseOperatorLink".msg = "Use airflow.models.baseoperatorlink.BaseOperatorLink"
# Uses deprecated in Python 3.12 `datetime.datetime.utcfromtimestamp`
"pendulum.from_timestamp".msg = "Use airflow.utils.timezone.from_timestamp"

[tool.ruff.flake8-tidy-imports]
# Ban certain modules from being imported at module level, instead requiring
Expand Down
46 changes: 45 additions & 1 deletion tests/utils/test_timezone.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import pendulum
import pytest
from pendulum.tz.timezone import Timezone
from pendulum.tz.timezone import FixedTimezone, Timezone

from airflow.utils import timezone
from airflow.utils.timezone import coerce_datetime, parse_timezone
Expand Down Expand Up @@ -156,3 +156,47 @@ def test_parse_timezone_offset(tz_offset: int, expected_offset, expected_name):
assert tz.offset == expected_offset
assert tz.name == expected_name
assert parse_timezone(tz_offset) is tz


@pytest.mark.parametrize(
"tz",
[
pytest.param(None, id="implicit"),
pytest.param(timezone.utc, id="explicit"),
pytest.param("UTC", id="utc-literal"),
],
)
def test_from_timestamp_utc(tz):
from_ts = timezone.from_timestamp(0) if tz is None else timezone.from_timestamp(0, tz=tz)
assert from_ts == pendulum.DateTime(1970, 1, 1, tzinfo=timezone.utc)
assert from_ts.tzinfo == timezone.utc


@pytest.mark.parametrize("tz", ["local", "LOCAL"])
def test_from_timestamp_local(tz):
local_tz = timezone.local_timezone()
from_ts = timezone.from_timestamp(0, tz=tz)
assert from_ts == pendulum.DateTime(1970, 1, 1, tzinfo=timezone.utc)
assert from_ts.tzinfo == local_tz


@pytest.mark.parametrize(
"tz, iana_timezone",
[
pytest.param(Timezone("Europe/Paris"), "Europe/Paris", id="pendulum-timezone"),
pytest.param("America/New_York", "America/New_York", id="IANA-timezone"),
],
)
def test_from_timestamp_iana_timezones(tz, iana_timezone):
from_ts = timezone.from_timestamp(0, tz=tz)
assert from_ts == pendulum.DateTime(1970, 1, 1, tzinfo=timezone.utc)
# In pendulum 2 there is a problem with compare tzinfo object (caching?), so we check the name
assert from_ts.tzinfo.name == iana_timezone
assert isinstance(from_ts.tzinfo, Timezone)


@pytest.mark.parametrize("utc_offset", [3600, -7200])
def test_from_timestamp_fixed_timezone(utc_offset):
from_ts = timezone.from_timestamp(0, tz=FixedTimezone(utc_offset))
assert from_ts == pendulum.DateTime(1970, 1, 1, tzinfo=timezone.utc)
assert from_ts.utcoffset() == datetime.timedelta(seconds=utc_offset)

0 comments on commit ff4fa4f

Please sign in to comment.