Skip to content

Commit

Permalink
Add test to already existing test
Browse files Browse the repository at this point in the history
  • Loading branch information
ephraimbuddy committed Mar 21, 2024
1 parent 4f9db46 commit 6937000
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 75 deletions.
1 change: 0 additions & 1 deletion airflow/serialization/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ def is_jsonable(x):
return True

max_size = conf.getint("core", "max_templated_field_size")

if not is_jsonable(template_field):
serialized = str(template_field)
if len(serialized) > max_size:
Expand Down
90 changes: 16 additions & 74 deletions tests/models/test_renderedtifields.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,8 @@
from unittest import mock

import pytest
from sqlalchemy import select

from airflow import settings
from airflow.decorators import task as task_decorator
from airflow.models import Variable
from airflow.models.renderedtifields import RenderedTaskInstanceFields as RTIF
from airflow.operators.bash import BashOperator
Expand Down Expand Up @@ -64,6 +62,14 @@ def __ne__(self, other):
return not self.__eq__(other)


class LargeStrObject:
def __init__(self):
self.a = "a" * 2560

def __str__(self):
return self.a


class TestRenderedTaskInstanceFields:
"""Unit tests for RenderedTaskInstanceFields."""

Expand Down Expand Up @@ -114,6 +120,14 @@ def teardown_method(self):
"{'att3': '{{ task.task_id }}', 'att4': '{{ task.task_id }}', 'template_fields': ['att3']}), "
"'template_fields': ['nested1']})",
),
(
"a" * 2560,
"Value removed due to size. You can change this behaviour in [core]max_templated_field_size",
),
(
LargeStrObject(),
"Value removed due to size. You can change this behaviour in [core]max_templated_field_size",
),
],
)
def test_get_templated_fields(self, templated_field, expected_rendered_field, dag_maker):
Expand Down Expand Up @@ -321,75 +335,3 @@ def test_redact(self, redact, dag_maker):
"env": "val 2",
"cwd": "val 3",
}

def test_large_string_is_not_stored(self, dag_maker, session):
"""
Test that large string is not stored in the database
"""
large_string = "a" * 2560
with dag_maker("test_large_objects_are_not_stored"):

@task_decorator
def gentask():
return large_string

@task_decorator
def consumer_task(value):
return value

consumer_task(gentask())

dr = dag_maker.create_dagrun()
ti, ti2 = dr.task_instances
ti.xcom_push(value=large_string, key="return_value")
rtif = RTIF(ti=ti2)
rtif.write(session=session)
session.flush()
rtif = session.query(RTIF).filter(RTIF.dag_id == rtif.dag_id, RTIF.task_id == rtif.task_id).first()

assert rtif.rendered_fields == {
"op_args": "Value removed due to size. "
"You can change this behaviour in [core]max_templated_field_size",
"op_kwargs": {},
"templates_dict": None,
}

def test_large_objects_are_not_stored(self, dag_maker, session):
"""
Test that large objects are not stored in the database
"""

class A:
def __init__(self):
self.a = "a" * 2560

def __str__(self):
return self.a

large_data = A()

with dag_maker("test_large_objects_are_not_stored"):

@task_decorator
def gentask():
return large_data

@task_decorator
def consumer_task(value):
return value

consumer_task(gentask())

dr = dag_maker.create_dagrun()
ti, ti2 = dr.task_instances
ti.xcom_push(value=str(large_data), key="return_value")
rtif = RTIF(ti=ti2)
rtif.write(session=session)
session.flush()
rtif = session.scalar(select(RTIF).where(RTIF.dag_id == rtif.dag_id, RTIF.task_id == rtif.task_id))
assert rtif.rendered_fields == {
"op_args": "Value removed due to size. "
"You can change this behaviour in [core]max_templated_field_size",
"op_kwargs": {},
"templates_dict": None,
}

0 comments on commit 6937000

Please sign in to comment.