Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dotenv run resolve path #507

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ jobs:
matrix:
os:
- ubuntu-latest
- windows-latest
python-version: ["3.8", "3.9", "3.10", "3.11", "3.12", pypy3.9, pypy3.10]

steps:
Expand Down
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,10 @@ defined in the following list:
- Default value, if provided.
- Empty string.

Note that on Windows environment variable names in `os.environ` are
[always uppercase](https://docs.python.org/3/library/os.html#os.environ)!
This may lead to some unexpected expansions if your variables are not all uppercase.

## Related Projects

- [Honcho](https://github.com/nickstenning/honcho) - For managing
Expand Down
6 changes: 6 additions & 0 deletions src/dotenv/cli.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
import os
import shlex
import shutil
import sys
from contextlib import contextmanager
from subprocess import Popen
Expand Down Expand Up @@ -189,6 +190,11 @@ def run_command(command: List[str], env: Dict[str, str]) -> int:
cmd_env = os.environ.copy()
cmd_env.update(env)

# Resolve path in a consistent way
app = shutil.which(command[0])
if app is not None:
command = (app,) + command[1:]

p = Popen(command,
universal_newlines=True,
bufsize=0,
Expand Down
118 changes: 97 additions & 21 deletions src/dotenv/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from collections import OrderedDict
from contextlib import contextmanager
from typing import (IO, Dict, Iterable, Iterator, Mapping, Optional, Tuple,
Union)
Union, overload)

from .parser import Binding, parse_stream
from .variables import parse_variables
Expand Down Expand Up @@ -40,6 +40,7 @@ def __init__(
verbose: bool = False,
encoding: Optional[str] = None,
interpolate: bool = True,
single_quotes_expand: bool = True,
override: bool = True,
) -> None:
self.dotenv_path: Optional[StrPath] = dotenv_path
Expand All @@ -48,6 +49,7 @@ def __init__(
self.verbose: bool = verbose
self.encoding: Optional[str] = encoding
self.interpolate: bool = interpolate
self.single_quotes_expand: bool = single_quotes_expand
self.override: bool = override

@contextmanager
Expand All @@ -70,20 +72,31 @@ def dict(self) -> Dict[str, Optional[str]]:
if self._dict:
return self._dict

raw_values = self.parse()

if self.interpolate:
self._dict = OrderedDict(resolve_variables(raw_values, override=self.override))
bindings = self.parse_to_bindings()
self._dict = OrderedDict(
_resolve_bindings(
bindings,
override=self.override,
single_quotes_expand=self.single_quotes_expand,
)
)
else:
raw_values = self.parse()
self._dict = OrderedDict(raw_values)

return self._dict

def parse(self) -> Iterator[Tuple[str, Optional[str]]]:
def parse_to_bindings(self) -> Iterator[Binding]:
with self._get_stream() as stream:
for mapping in with_warn_for_invalid_lines(parse_stream(stream)):
if mapping.key is not None:
yield mapping.key, mapping.value
yield mapping

def parse(self) -> Iterator[Tuple[str, Optional[str]]]:
for mapping in self.parse_to_bindings():
assert mapping.key is not None
yield mapping.key, ''.join([v.value for v in mapping.value]) if mapping.value is not None else None

def set_as_environment_variables(self) -> bool:
"""
Expand All @@ -93,7 +106,8 @@ def set_as_environment_variables(self) -> bool:
return False

for k, v in self.dict().items():
if k in os.environ and not self.override:
key_present = k in os.environ or (os.name == 'nt' and k.upper() in os.environ)
if key_present and not self.override:
continue
if v is not None:
os.environ[k] = v
Expand Down Expand Up @@ -229,31 +243,89 @@ def unset_key(
return removed, key_to_unset


def _resolve_bindings(
bindings: Iterable[Binding],
override: bool,
single_quotes_expand: bool,
) -> Mapping[str, Optional[str]]:
new_values: Dict[str, Optional[str]] = {}

for binding in bindings:
name = binding.key
if name is None:
continue

result: Optional[str] = None
if binding.value is not None:
result = ''
for quote, value in binding.value:
if not single_quotes_expand and quote == "'":
result += value
else:
result += resolve_variable(value, new_values, override)

new_values[name] = result

return new_values


def resolve_variables(
values: Iterable[Tuple[str, Optional[str]]],
override: bool,
) -> Mapping[str, Optional[str]]:
"""
Expand POSIX variables present in the provided sequence of key-value pairs.

Resolved `values` and `os.environ` are used as defined variables.
New values take precedence over `os.environ` if `override` is True.
"""
new_values: Dict[str, Optional[str]] = {}

for (name, value) in values:
if value is None:
result = None
else:
atoms = parse_variables(value)
env: Dict[str, Optional[str]] = {}
if override:
env.update(os.environ) # type: ignore
env.update(new_values)
else:
env.update(new_values)
env.update(os.environ) # type: ignore
result = "".join(atom.resolve(env) for atom in atoms)

new_values[name] = result
new_values[name] = resolve_variable(value, new_values, override)

return new_values


@overload
def resolve_variable(
value: str,
variables: Dict[str, Optional[str]],
override: bool) -> str: ...


@overload
def resolve_variable(
value: None,
variables: Dict[str, Optional[str]],
override: bool) -> None: ...


def resolve_variable(
value: Optional[str],
variables: Dict[str, Optional[str]],
override: bool
) -> Optional[str]:
"""
Expand POSIX variables present in the provided value.

`variables` and `os.environ` are used as defined variables.
`variables` take precedence over `os.environ` if `override` is True.
"""
if value is None:
return value

atoms = parse_variables(value)
env: Dict[str, Optional[str]] = {}
if override:
env.update(os.environ) # type: ignore
env.update(variables)
else:
env.update(variables)
env.update(os.environ) # type: ignore
return "".join(atom.resolve(env) for atom in atoms)


def _walk_to_root(path: str) -> Iterator[str]:
"""
Yield directories starting from the given directory up to the root
Expand Down Expand Up @@ -324,6 +396,7 @@ def load_dotenv(
verbose: bool = False,
override: bool = False,
interpolate: bool = True,
single_quotes_expand: bool = True,
encoding: Optional[str] = "utf-8",
) -> bool:
"""Parse a .env file and then load all the variables found as environment variables.
Expand All @@ -350,6 +423,7 @@ def load_dotenv(
stream=stream,
verbose=verbose,
interpolate=interpolate,
single_quotes_expand=single_quotes_expand,
override=override,
encoding=encoding,
)
Expand All @@ -361,6 +435,7 @@ def dotenv_values(
stream: Optional[IO[str]] = None,
verbose: bool = False,
interpolate: bool = True,
single_quotes_expand: bool = True,
encoding: Optional[str] = "utf-8",
) -> Dict[str, Optional[str]]:
"""
Expand All @@ -387,6 +462,7 @@ def dotenv_values(
stream=stream,
verbose=verbose,
interpolate=interpolate,
single_quotes_expand=single_quotes_expand,
override=True,
encoding=encoding,
).dict()
53 changes: 40 additions & 13 deletions src/dotenv/parser.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import codecs
import re
from typing import (IO, Iterator, Match, NamedTuple, Optional, # noqa:F401
from typing import (IO, Iterator, List, Match, NamedTuple, Optional, # noqa:F401
Pattern, Sequence, Tuple)


Expand All @@ -17,9 +17,10 @@ def make_regex(string: str, extra_flags: int = 0) -> Pattern[str]:
_equal_sign = make_regex(r"(=[^\S\r\n]*)")
_single_quoted_value = make_regex(r"'((?:\\'|[^'])*)'")
_double_quoted_value = make_regex(r'"((?:\\"|[^"])*)"')
_unquoted_value = make_regex(r"([^\r\n]*)")
_unquoted_value = make_regex(r"((?:\\'|\\\"|[^'\"\r\n])*)")
_comment = make_regex(r"(?:[^\S\r\n]*#[^\r\n]*)?")
_end_of_line = make_regex(r"[^\S\r\n]*(?:\r\n|\n|\r|$)")
_until_end_of_line = make_regex(r"[^\r\n]*")
_rest_of_line = make_regex(r"[^\r\n]*(?:\r|\n|\r\n)?")
_double_quote_escapes = make_regex(r"\\[\\'\"abfnrtv]")
_single_quote_escapes = make_regex(r"\\[\\']")
Expand All @@ -30,9 +31,14 @@ class Original(NamedTuple):
line: int


class ValuePart(NamedTuple):
quote: Optional[str]
value: str


class Binding(NamedTuple):
key: Optional[str]
value: Optional[str]
value: Optional[List[ValuePart]]
original: Original
error: bool

Expand Down Expand Up @@ -113,21 +119,27 @@ def parse_key(reader: Reader) -> Optional[str]:
return key


def parse_unquoted_value(reader: Reader) -> str:
def parse_unquoted_value(reader: Reader) -> Tuple[str, bool]:
(part,) = reader.read_regex(_unquoted_value)
return re.sub(r"\s+#.*", "", part).rstrip()
stripped_comment = re.sub(r"\s+#.*", "", part)
return stripped_comment, part != stripped_comment


def peek_quote(reader: Reader) -> Optional[str]:
char = reader.peek(1)
return char if char in [u'"', u"'"] else None


def parse_value(reader: Reader) -> str:
def parse_value(reader: Reader) -> Tuple[str, bool]:
char = reader.peek(1)
if char == u"'":
(value,) = reader.read_regex(_single_quoted_value)
return decode_escapes(_single_quote_escapes, value)
return decode_escapes(_single_quote_escapes, value), False
elif char == u'"':
(value,) = reader.read_regex(_double_quoted_value)
return decode_escapes(_double_quote_escapes, value)
return decode_escapes(_double_quote_escapes, value), False
elif char in (u"", u"\n", u"\r"):
return u""
return u"", False
else:
return parse_unquoted_value(reader)

Expand All @@ -146,16 +158,31 @@ def parse_binding(reader: Reader) -> Binding:
reader.read_regex(_export)
key = parse_key(reader)
reader.read_regex(_whitespace)

strings: Optional[List[ValuePart]] = None
if reader.peek(1) == "=":
reader.read_regex(_equal_sign)
value: Optional[str] = parse_value(reader)
else:
value = None

strings = []
start_comment: bool = False
while reader.has_next() and not reader.peek(1) in (u"", u"\n", u"\r") and not start_comment:
quote: Optional[str] = peek_quote(reader)
value, start_comment = parse_value(reader)
strings.append(ValuePart(quote, value))
if strings and strings[-1].quote is None:
stripped = strings[-1].value.rstrip()
if len(stripped) > 0:
strings[-1] = ValuePart(None, stripped)
else:
strings = strings[:-1]
if start_comment:
reader.read_regex(_until_end_of_line)

reader.read_regex(_comment)
reader.read_regex(_end_of_line)
return Binding(
key=key,
value=value,
value=strings,
original=reader.get_marked(),
error=False,
)
Expand Down
Loading
Loading