From 4c16e500e9e8d653ffd6216e9e11141791a4032e Mon Sep 17 00:00:00 2001 From: Robert Aistleitner Date: Wed, 26 Apr 2023 01:05:20 +0330 Subject: [PATCH 1/2] Implement proper support for nested complex env values Co-authored-by: Hasan Ramezani --- pydantic_settings/sources.py | 26 ++++++++++++ tests/test_settings.py | 77 ++++++++++++++++++++++++++++++++++++ 2 files changed, 103 insertions(+) diff --git a/pydantic_settings/sources.py b/pydantic_settings/sources.py index fc039c25..47f59c9a 100644 --- a/pydantic_settings/sources.py +++ b/pydantic_settings/sources.py @@ -288,6 +288,16 @@ def _field_is_complex(self, field: FieldInfo) -> Tuple[bool, bool]: return True, allow_parse_failure + @staticmethod + def next_field(field: Optional[FieldInfo], key: str) -> Optional[FieldInfo]: + if not field or origin_is_union(get_origin(field.annotation)): + # no support for Unions of complex BaseSettings fields + return None + elif field.annotation and hasattr(field.annotation, 'model_fields') and field.annotation.model_fields.get(key): + return field.annotation.model_fields[key] + + return None + def explode_env_vars( self, field_name: str, field: FieldInfo, env_vars: Mapping[str, Optional[str]] ) -> Dict[str, Any]: @@ -307,8 +317,24 @@ def explode_env_vars( env_name_without_prefix = env_name[self.env_prefix_len :] _, *keys, last_key = env_name_without_prefix.split(self.env_nested_delimiter) env_var = result + target_field: Optional[FieldInfo] = field + for key in keys: + target_field = self.next_field(target_field, key) env_var = env_var.setdefault(key, {}) + + # get proper field with last_key + target_field = self.next_field(target_field, last_key) + + # check if env_val maps to a complex field and if so, parse the env_val + if target_field and env_val: + is_complex, allow_json_failure = self._field_is_complex(target_field) + if is_complex: + try: + env_val = json.loads(env_val) + except ValueError as e: + if not allow_json_failure: + raise e env_var[last_key] = env_val return result diff --git a/tests/test_settings.py b/tests/test_settings.py index 0f182371..5d89afe4 100644 --- a/tests/test_settings.py +++ b/tests/test_settings.py @@ -1390,3 +1390,80 @@ def settings_customise_sources( SettingsError, match='error getting value for field "top" from source "BadCustomSettingsSource"' ): Settings() + + +def test_nested_env_complex_values(env): + class SubSubModel(BaseSettings): + dvals: Dict + + class SubModel(BaseSettings): + vals: List[str] + sub_sub_model: SubSubModel + + class Cfg(BaseSettings): + sub_model: SubModel + + model_config = ConfigDict(env_prefix='cfg_', env_nested_delimiter='__') + + env.set('cfg_sub_model__vals', '["one", "two"]') + env.set('cfg_sub_model__sub_sub_model__dvals', '{"three": 4}') + + assert Cfg().model_dump() == {'sub_model': {'vals': ['one', 'two'], 'sub_sub_model': {'dvals': {'three': 4}}}} + + env.set('cfg_sub_model__vals', 'invalid') + with pytest.raises( + SettingsError, match='error parsing value for field "sub_model" from source "EnvSettingsSource"' + ): + Cfg() + + +def test_nested_env_nonexisting_field(env): + class SubModel(BaseSettings): + vals: List[str] + + class Cfg(BaseSettings): + sub_model: SubModel + + model_config = ConfigDict(env_prefix='cfg_', env_nested_delimiter='__') + + env.set('cfg_sub_model__foo_vals', '[]') + with pytest.raises(ValidationError): + Cfg() + + +def test_nested_env_nonexisting_field_deep(env): + class SubModel(BaseSettings): + vals: List[str] + + class Cfg(BaseSettings): + sub_model: SubModel + + model_config = ConfigDict(env_prefix='cfg_', env_nested_delimiter='__') + + env.set('cfg_sub_model__vals__foo__bar__vals', '[]') + with pytest.raises(ValidationError): + Cfg() + + +def test_nested_env_union_complex_values(env): + class SubModel(BaseSettings): + vals: Union[List[str], Dict[str, str]] + + class Cfg(BaseSettings): + sub_model: SubModel + + model_config = ConfigDict(env_prefix='cfg_', env_nested_delimiter='__') + + env.set('cfg_sub_model__vals', '["one", "two"]') + assert Cfg().model_dump() == {'sub_model': {'vals': ['one', 'two']}} + + env.set('cfg_sub_model__vals', '{"three": "four"}') + assert Cfg().model_dump() == {'sub_model': {'vals': {'three': 'four'}}} + + env.set('cfg_sub_model__vals', 'stringval') + with pytest.raises(ValidationError): + Cfg() + + env.set('cfg_sub_model__vals', '{"invalid": dict}') + with pytest.raises(ValidationError): + Cfg() From 2b184ef0adbb61f9f4550ad61095a2e3238a650d Mon Sep 17 00:00:00 2001 From: Hasan Ramezani Date: Wed, 26 Apr 2023 01:37:05 +0330 Subject: [PATCH 2/2] Add docstring --- pydantic_settings/sources.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/pydantic_settings/sources.py b/pydantic_settings/sources.py index 47f59c9a..0f8268cc 100644 --- a/pydantic_settings/sources.py +++ b/pydantic_settings/sources.py @@ -290,6 +290,25 @@ def _field_is_complex(self, field: FieldInfo) -> Tuple[bool, bool]: @staticmethod def next_field(field: Optional[FieldInfo], key: str) -> Optional[FieldInfo]: + """ + Find the field in a sub model by key(env name) + + By having the following models: + + class SubSubModel(BaseSettings): + dvals: Dict + + class SubModel(BaseSettings): + vals: List[str] + sub_sub_model: SubSubModel + + class Cfg(BaseSettings): + sub_model: SubModel + + Then: + next_field(sub_model, 'vals') Returns the `vals` field of `SubModel` class + next_field(sub_model, 'sub_sub_model') Returns `sub_sub_model` field of `SubModel` class + """ if not field or origin_is_union(get_origin(field.annotation)): # no support for Unions of complex BaseSettings fields return None