diff --git a/src/openjd/sessions/_embedded_files.py b/src/openjd/sessions/_embedded_files.py index ee4c1cc9..45535e94 100644 --- a/src/openjd/sessions/_embedded_files.py +++ b/src/openjd/sessions/_embedded_files.py @@ -77,8 +77,10 @@ def write_file_for_user( if user is not None: user = cast(WindowsSessionUser, user) process_user = get_process_user() - WindowsPermissionHelper.set_permissions_full_control( - str(filename), [process_user, user.user] + WindowsPermissionHelper.set_permissions( + str(filename), + principals_full_control=[process_user], + principals_modify_access=[user.user], ) diff --git a/src/openjd/sessions/_session.py b/src/openjd/sessions/_session.py index 77f7c549..05207ee1 100644 --- a/src/openjd/sessions/_session.py +++ b/src/openjd/sessions/_session.py @@ -162,6 +162,23 @@ class Session(object): module's LOG Logger at log level INFO. The LogRecords sent to the log have an extra attribute named "session_id" whose value is the session_id that was passed to the constructor of the Session. + + Each Session has its own temporary working directory, referred to as the Session + Working Directory. Instantiating this class creates this directory for the duration + of the Session, and you are expected to call the cleanup() method on your Session + to delete that directory when done (this is done automatically if using the Session + as a context manager). + On POSIX: + - The Session Working Directory's owner is the process owner. + - If a PosixSessionUser is provided, then the group of that user is the group owner + of the the directory. + On Windows: + - All files and directories within the Session Working Directory inherit the + directory's ACL. + - The Session Working Directory's ACL is set so that the process owner has full + control + - If a WindowsSessionUser is provided then the user within that SessionUser + is also given Modify access. """ _state: SessionState diff --git a/src/openjd/sessions/_session_user.py b/src/openjd/sessions/_session_user.py index 3fe46293..1561fe03 100644 --- a/src/openjd/sessions/_session_user.py +++ b/src/openjd/sessions/_session_user.py @@ -112,9 +112,16 @@ class WindowsSessionUser(SessionUser): includes a logon session obtained by ssh-ing into the host), then you must instantiate this class with a username + logon_token; providing a password is not allowed in Session 0. To create a logon_token, you will want to look in to the LogonUser family of Win32 system APIs. + + The user provided in this class directly influences the Directory ACL of the Session Working + Directory that is created. The created directory: + 1. Has Full Control by the owner of the calling process; and + 2. Has Modify access by the provided user. + The Session working directory will also be set so that all child directories and files + inherit these permissions. """ - __slots__ = ("user", "group", "password", "logon_token") + __slots__ = ("user", "password", "logon_token") user: str """ @@ -123,13 +130,6 @@ class with a username + logon_token; providing a password is not allowed in Sess ex: localUser, domain\\domainUser """ - group: str - """ - Group name of the identity to run the Session's subprocesses under. - This can be just a group name for a local group, or a domain group in down-level logon form. - ex: localGroup, domain\\domainGroup - """ - password: Optional[str] """ Password of the identity to run the Session's subprocess(es) under. @@ -146,7 +146,6 @@ def __init__( self, user: str, *, - group: Optional[str] = None, password: Optional[str] = None, logon_token: Optional[HANDLE] = None, ) -> None: @@ -158,12 +157,6 @@ def __init__( or a domain's UPN. ex: localUser, domain\\domainUser, domainUser@domain.com - group (Optional[str]): - Group name of the identity to run the Session's subprocesses under. - This can be just a group name for a local group, or a domain group in down-level format. - ex: localGroup, domain\\domainGroup - Defaults to the username if not provided. - password (Optional[str]): Password of the identity to run the Session's subprocess under. This argument is mutually-exclusive with the "logon_token" argument. @@ -184,7 +177,6 @@ def __init__( ) self.user = user - self.group = group if group else user domain, username_without_domain = self._split_domain_and_username(user) diff --git a/src/openjd/sessions/_tempdir.py b/src/openjd/sessions/_tempdir.py index c1cabd6b..51a67635 100644 --- a/src/openjd/sessions/_tempdir.py +++ b/src/openjd/sessions/_tempdir.py @@ -122,12 +122,10 @@ def __init__( elif is_windows(): user = cast(WindowsSessionUser, user) try: - principal_to_permit = user.group - - process_user = get_process_user() - - WindowsPermissionHelper.set_permissions_full_control( - str(self.path), [principal_to_permit, process_user] + WindowsPermissionHelper.set_permissions( + str(self.path), + principals_full_control=[get_process_user()], + principals_modify_access=[user.user], ) except Exception as err: raise RuntimeError( diff --git a/src/openjd/sessions/_windows_permission_helper.py b/src/openjd/sessions/_windows_permission_helper.py index 24ebd531..624fe276 100644 --- a/src/openjd/sessions/_windows_permission_helper.py +++ b/src/openjd/sessions/_windows_permission_helper.py @@ -13,14 +13,20 @@ class WindowsPermissionHelper: """ @staticmethod - def set_permissions_full_control(file_path, principals_to_permit): + def set_permissions( + file_path: str, + *, + principals_full_control: list[str] = [], + principals_modify_access: list[str] = [], + ): """ - Grants full control over the object at file_path to all principals in principals_to_permit. + Grants access control over the object at file_path. Sets flags so both child files and directories inherit these permissions. Arguments: file_path (str): The path to the file or directory. - principals_to_permit (List[str]): The names of the principals to permit. + principals_full_control (List[str]): The names of the principals to permit Full Control access. + principals_modify_access (List[str]): The names of the principals to permit Modify access Raises: RuntimeError if there is a problem modifying the security attributes. @@ -28,14 +34,31 @@ def set_permissions_full_control(file_path, principals_to_permit): try: # We don't want to propagate existing permissions, so create a new DACL dacl = win32security.ACL() - for principal in principals_to_permit: + for principal in principals_full_control: user_or_group_sid, _, _ = win32security.LookupAccountName(None, principal) # Add an ACE to the DACL giving the principal full control and enabling inheritance of the ACE dacl.AddAccessAllowedAceEx( win32security.ACL_REVISION, ntsecuritycon.OBJECT_INHERIT_ACE | ntsecuritycon.CONTAINER_INHERIT_ACE, - ntsecuritycon.FILE_ALL_ACCESS, + ntsecuritycon.FILE_ALL_ACCESS, # = 0x1F01FF + user_or_group_sid, + ) + for principal in principals_modify_access: + user_or_group_sid, _, _ = win32security.LookupAccountName(None, principal) + + # Add an ACE to the DACL giving the principal full control and enabling inheritance of the ACE + dacl.AddAccessAllowedAceEx( + win32security.ACL_REVISION, + ntsecuritycon.OBJECT_INHERIT_ACE | ntsecuritycon.CONTAINER_INHERIT_ACE, + # Values of these constants defined in winnt.h + # Constant value after ORs: 0x1301FF + # Delta from FILE_ALL_ACCESS is 0xC0000 = WRITE_DAC(0x40000) | WRITE_OWNER(0x80000) + ntsecuritycon.FILE_GENERIC_READ # = 0x120089 + | ntsecuritycon.FILE_GENERIC_WRITE # = 0x120116 + | ntsecuritycon.FILE_GENERIC_EXECUTE # = 0x1200A0 + | ntsecuritycon.DELETE # = 0x10000 + | ntsecuritycon.FILE_DELETE_CHILD, # = 0x0040 user_or_group_sid, ) @@ -54,6 +77,8 @@ def set_permissions_full_control(file_path, principals_to_permit): sd.SetSecurityDescriptorDacl(1, dacl, 0) # Set the security descriptor to the tempdir + # Note: This completely overwrites the DACL; so, if we don't provide a permission above then + # the DACL doesn't have it. win32security.SetFileSecurity( str(file_path), win32security.DACL_SECURITY_INFORMATION, sd ) diff --git a/test/openjd/sessions/test_embedded_files.py b/test/openjd/sessions/test_embedded_files.py index 8cca7c23..ae5b1edb 100644 --- a/test/openjd/sessions/test_embedded_files.py +++ b/test/openjd/sessions/test_embedded_files.py @@ -9,7 +9,7 @@ from openjd.sessions._os_checker import is_posix, is_windows import pytest -from utils.windows_acl_helper import principal_has_full_control_of_object +from utils.windows_acl_helper import MODIFY_READ_WRITE_MASK, principal_has_access_to_object from openjd.model import SymbolTable from openjd.model.v2023_09 import DataString as DataString_2023_09 @@ -358,9 +358,9 @@ def test_changes_owner(self, tmp_path: Path, windows_user: WindowsSessionUser) - # THEN assert os.path.exists(filename) - assert principal_has_full_control_of_object( - str(filename), windows_user.user - ), "Windows user has full control" + assert principal_has_access_to_object( + str(filename), windows_user.user, MODIFY_READ_WRITE_MASK + ), "Windows user has access" with open(filename, "r") as file: result_contents = file.read() assert result_contents == testdata, "File contents are as expected" @@ -588,9 +588,9 @@ class Datum: result_contents = file.read() assert result_contents == data.data, "File contents are as expected" # Check file permissions - assert principal_has_full_control_of_object( - filename, windows_user.user - ), "Windows user has full control" + assert principal_has_access_to_object( + filename, windows_user.user, MODIFY_READ_WRITE_MASK + ), "Windows user has access" def test_resolves_symbols(self, tmp_path: Path) -> None: # Tests that the set of files can reference themselves and each other diff --git a/test/openjd/sessions/test_session.py b/test/openjd/sessions/test_session.py index 7f372725..e53f8c03 100644 --- a/test/openjd/sessions/test_session.py +++ b/test/openjd/sessions/test_session.py @@ -371,10 +371,14 @@ def test_cleanup_windows_user( with open(working_dir_file_path, "w") as f: f.write("File content") - WindowsPermissionHelper.set_permissions_full_control(subdir_path, [windows_user.user]) - WindowsPermissionHelper.set_permissions_full_control(subdir_file_path, [windows_user.user]) - WindowsPermissionHelper.set_permissions_full_control( - working_dir_file_path, [windows_user.user] + WindowsPermissionHelper.set_permissions( + subdir_path, principals_full_control=[windows_user.user] + ) + WindowsPermissionHelper.set_permissions( + subdir_file_path, principals_full_control=[windows_user.user] + ) + WindowsPermissionHelper.set_permissions( + working_dir_file_path, principals_full_control=[windows_user.user] ) session.cleanup() diff --git a/test/openjd/sessions/test_session_user.py b/test/openjd/sessions/test_session_user.py index a8553bec..1248f9c0 100644 --- a/test/openjd/sessions/test_session_user.py +++ b/test/openjd/sessions/test_session_user.py @@ -28,11 +28,7 @@ class TestWindowsSessionUser: return_value=False, ) def test_user_not_converted(self, mock_is_process_user, mock_validate_username, user): - windows_session_user = WindowsSessionUser( - user, - password="password", - group="test_group", - ) + windows_session_user = WindowsSessionUser(user, password="password") assert windows_session_user.user == user @@ -41,7 +37,7 @@ def test_no_password_impersonation_throws_exception(self): RuntimeError, match="Must supply a password or logon token. User is not the process owner.", ): - WindowsSessionUser("nonexistent_user", group="test_group") + WindowsSessionUser("nonexistent_user") @pytest.mark.skipif( tests_are_in_windows_session_0(), diff --git a/test/openjd/sessions/test_tempdir.py b/test/openjd/sessions/test_tempdir.py index d4be05bb..1aa01c83 100644 --- a/test/openjd/sessions/test_tempdir.py +++ b/test/openjd/sessions/test_tempdir.py @@ -10,14 +10,19 @@ from openjd.sessions._os_checker import is_posix, is_windows from openjd.sessions._windows_permission_helper import WindowsPermissionHelper from utils.windows_acl_helper import ( - principal_has_full_control_of_object, - principal_has_no_permissions_on_object, + MODIFY_READ_WRITE_MASK, + FULL_CONTROL_MASK, + get_aces_for_object, + principal_has_access_to_object, ) if is_posix(): import grp import pwd +if is_windows(): + from openjd.sessions._win32._helpers import get_process_user # type: ignore + import pytest from unittest.mock import patch @@ -102,38 +107,40 @@ def test_no_write_permission(self) -> None: TempDir(dir=dir) -@pytest.mark.xfail(not is_windows(), reason="Windows-specific tests") +@pytest.mark.skipif(not is_windows(), reason="Windows-specific tests") class TestTempDirWindows: - @patch("openjd.sessions.WindowsSessionUser.is_process_user", return_value=True) - def test_windows_user_with_group_permits_group(self, mock_user_match): - # GIVEN - # Use a builtin group, so we can expect it to exist on any Windows machine - windows_user = WindowsSessionUser("arbitrary_user", group="Users") - - # WHEN - tempdir = TempDir(user=windows_user) - - # THEN - assert principal_has_full_control_of_object(str(tempdir.path), windows_user.group) + @pytest.mark.xfail(not has_windows_user(), reason=WIN_SET_TEST_ENV_VARS_MESSAGE) + @pytest.mark.usefixtures("windows_user") @patch("openjd.sessions.WindowsSessionUser.is_process_user", return_value=True) - def test_wrong_group_not_permitted(self, mock_user_match): + def test_windows_object_permissions(self, mock_user_match, windows_user: WindowsSessionUser): + # Test that TempDir gives the given WindowsSessionUser Modify/R/W, but not Full Control + # permissions on the created directory. + # GIVEN - # Use a builtin group, so we can expect it to exist on any Windows machine - windows_user = WindowsSessionUser("arbitrary_user", group="Users") + process_owner = get_process_user() + if "\\" in process_owner: + # Extract user from NETBIOS name + process_owner = process_owner.split("\\")[1] + elif "@" in process_owner: + # Extract user from domain UPN + process_owner = process_owner.split("@")[0] # WHEN tempdir = TempDir(user=windows_user) + aces = get_aces_for_object(str(tempdir.path)) # THEN - assert principal_has_no_permissions_on_object(str(tempdir.path), "Guests") - + assert len(aces) == 2 # Only self & user + assert aces[process_owner][0] == [FULL_CONTROL_MASK] # allowed + assert aces[process_owner][1] == [] # denied + assert aces[windows_user.user][0] == [MODIFY_READ_WRITE_MASK] # allowed + assert aces[windows_user.user][1] == [] # denied + + @pytest.mark.xfail(not has_windows_user(), reason=WIN_SET_TEST_ENV_VARS_MESSAGE) + @pytest.mark.usefixtures("windows_user") @patch("openjd.sessions.WindowsSessionUser.is_process_user", return_value=True) - def test_windows_user_with_group_permits_group_permissions_inherited(self, mock_user_match): - # GIVEN - # Use a builtin group, so we can expect it to exist on any Windows machine - windows_user = WindowsSessionUser("arbitrary_user", group="Users") - + def test_windows_permissions_inherited(self, mock_user_match, windows_user: WindowsSessionUser): # WHEN tempdir = TempDir(user=windows_user) os.mkdir(tempdir.path / "child_dir") @@ -142,37 +149,30 @@ def test_windows_user_with_group_permits_group_permissions_inherited(self, mock_ open(tempdir.path / "child_dir" / "grandchild_file", "a").close() # THEN - assert principal_has_full_control_of_object(str(tempdir.path), windows_user.group) - assert principal_has_full_control_of_object( - str(tempdir.path / "child_dir"), windows_user.group + assert principal_has_access_to_object( + str(tempdir.path), windows_user.user, MODIFY_READ_WRITE_MASK ) - assert principal_has_full_control_of_object( - str(tempdir.path / "child_file"), windows_user.group + assert principal_has_access_to_object( + str(tempdir.path / "child_dir"), windows_user.user, MODIFY_READ_WRITE_MASK ) - assert principal_has_full_control_of_object( - str(tempdir.path / "child_dir" / "grandchild_dir"), windows_user.group + assert principal_has_access_to_object( + str(tempdir.path / "child_file"), windows_user.user, MODIFY_READ_WRITE_MASK ) - assert principal_has_full_control_of_object( - str(tempdir.path / "child_dir" / "grandchild_file"), windows_user.group + assert principal_has_access_to_object( + str(tempdir.path / "child_dir" / "grandchild_dir"), + windows_user.user, + MODIFY_READ_WRITE_MASK, + ) + assert principal_has_access_to_object( + str(tempdir.path / "child_dir" / "grandchild_file"), + windows_user.user, + MODIFY_READ_WRITE_MASK, ) - - # Mock is_process_user to get around the password requirement, since we're just testing - # file permissions - @patch("openjd.sessions.WindowsSessionUser.is_process_user", return_value=True) - def test_windows_user_without_group_permits_user(self, mock_user_match): - # GIVEN - windows_user = WindowsSessionUser("Guest") - - # WHEN - tempdir = TempDir(user=windows_user) - - # THEN - assert principal_has_full_control_of_object(str(tempdir.path), "Guest") @patch("openjd.sessions.WindowsSessionUser.is_process_user", return_value=True) - def test_invalid_windows_group_raises_exception(self, mock_user_match): + def test_nonvalid_windows_principal_raises_exception(self, mock_user_match): # GIVEN - windows_user = WindowsSessionUser("Guest", group="nonexistent_group") + windows_user = WindowsSessionUser("non_existent_user") # THEN with pytest.raises(RuntimeError, match="Could not change permissions of directory"): @@ -195,6 +195,28 @@ def test_windows_temp_dir(self, monkeypatch, clean_up_directory): Path(expected_dir).parent ), r"Directory C:\ProgramDataForOpenJDTest\Amazon should be created." + def test_cleanup(self, windows_user: WindowsSessionUser) -> None: + # Ensure that we can delete the files in that directory that have been + # created by the other user. + + # GIVEN + tmpdir = TempDir(user=windows_user) + testfilename = str(tmpdir.path / "testfile.txt") + + # Create a file on which only windows_user has permissions + with open(testfilename, "w") as f: + f.write("File content") + WindowsPermissionHelper.set_permissions( + testfilename, principals_full_control=[windows_user.user] + ) + + # WHEN + tmpdir.cleanup() + + # THEN + assert not os.path.exists(testfilename) + assert not os.path.exists(tmpdir.path) + @pytest.mark.usefixtures("posix_target_user", "posix_disjoint_user") class TestTempDirPosixUser: @@ -264,49 +286,3 @@ def test_cannot_change_to_group(self, posix_disjoint_user: PosixSessionUser) -> # WHEN with pytest.raises(RuntimeError): TempDir(user=posix_disjoint_user) - - -@pytest.mark.skipif(not is_windows(), reason="Windows-specific test") -@pytest.mark.xfail(not has_windows_user(), reason=WIN_SET_TEST_ENV_VARS_MESSAGE) -@pytest.mark.usefixtures("windows_user") -class TestTempDirWindowsUser: - """Tests of the TempDir when the resulting directory is to be owned by - a different user than the current process. - """ - - def test_windows_user_without_group_permits_user( - self, windows_user: WindowsSessionUser - ) -> None: - # Ensure that we can create the temporary directory. - - # GIVEN - tmpdir = custom_gettempdir() - - # WHEN - result = TempDir(user=windows_user) - - # THEN - assert str(result.path.parent) == tmpdir - assert os.path.exists(result.path) - - assert principal_has_full_control_of_object(str(result.path), windows_user.user) - - def test_cleanup(self, windows_user: WindowsSessionUser) -> None: - # Ensure that we can delete the files in that directory that have been - # created by the other user. - - # GIVEN - tmpdir = TempDir(user=windows_user) - testfilename = str(tmpdir.path / "testfile.txt") - - # Create a file on which only windows_user has permissions - with open(testfilename, "w") as f: - f.write("File content") - WindowsPermissionHelper.set_permissions_full_control(testfilename, [windows_user.user]) - - # WHEN - tmpdir.cleanup() - - # THEN - assert not os.path.exists(testfilename) - assert not os.path.exists(tmpdir.path) diff --git a/test/openjd/utils/windows_acl_helper.py b/test/openjd/utils/windows_acl_helper.py index 1261782e..be29e1a6 100644 --- a/test/openjd/utils/windows_acl_helper.py +++ b/test/openjd/utils/windows_acl_helper.py @@ -5,7 +5,42 @@ if is_windows(): import win32security -FULL_CONTROL_MASK = 2032127 +MODIFY_READ_WRITE_MASK = 0x1301FF +FULL_CONTROL_MASK = 0x1F01FF + + +def get_aces_for_object(object_path: str) -> dict[str, tuple[list[int], list[int]]]: + """Obtain a dictionary representation of the Access Control Entities (ACEs) of the + given object. The returned dictionary has the form: + { + : ( + [ , ... ], + [ , ... ] + ), + ... + } + """ + return_dict = dict[str, tuple[list[int], list[int]]]() + sd = win32security.GetFileSecurity(object_path, win32security.DACL_SECURITY_INFORMATION) + + dacl = sd.GetSecurityDescriptorDacl() + + for i in range(dacl.GetAceCount()): + ace = dacl.GetAce(i) + + ace_type = ace[0][0] + access_mask = ace[1] + ace_principal_sid = ace[2] + + account_name, _, _ = win32security.LookupAccountSid(None, ace_principal_sid) + if account_name not in return_dict: + return_dict[account_name] = (list[int](), list[int]()) + if ace_type == win32security.ACCESS_ALLOWED_ACE_TYPE: + return_dict[account_name][0].append(access_mask) + elif ace_type == win32security.ACCESS_DENIED_ACE_TYPE: + return_dict[account_name][1].append(access_mask) + + return return_dict def get_aces_for_principal_on_object(object_path: str, principal_name: str): @@ -37,8 +72,6 @@ def get_aces_for_principal_on_object(object_path: str, principal_name: str): access_mask = ace[1] ace_principal_sid = ace[2] - account_name, _, _ = win32security.LookupAccountSid(None, ace_principal_sid) - if ace_principal_sid == principal_to_check_sid: if ace_type == win32security.ACCESS_ALLOWED_ACE_TYPE: access_allowed_masks.append(access_mask) @@ -48,15 +81,9 @@ def get_aces_for_principal_on_object(object_path: str, principal_name: str): return access_allowed_masks, access_denied_masks -def principal_has_full_control_of_object(object_path, principal_name): +def principal_has_access_to_object(object_path, principal_name, access_mask): access_allowed_masks, access_denied_masks = get_aces_for_principal_on_object( object_path, principal_name ) - return FULL_CONTROL_MASK in access_allowed_masks and len(access_denied_masks) == 0 - - -def principal_has_no_permissions_on_object(object_path, principal_name): - access_allowed_masks, _ = get_aces_for_principal_on_object(object_path, principal_name) - - return len(access_allowed_masks) == 0 + return access_allowed_masks == [access_mask] and len(access_denied_masks) == 0