Skip to content

Commit

Permalink
code cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
Jonathan committed Jan 28, 2023
1 parent c19ff6e commit 948e5e2
Showing 1 changed file with 33 additions and 50 deletions.
83 changes: 33 additions & 50 deletions comtypes/test/test_portabledevice.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
# so we don't crash when the generated files don't exist
from __future__ import annotations

from ctypes import (POINTER, _Pointer, pointer, cast,
c_uint32, c_uint16, c_int8, c_ulong, c_wchar_p)
from ctypes import POINTER, _Pointer, pointer, cast, c_ulong, c_wchar_p
from typing import Optional
import unittest as ut

import comtypes
import comtypes.client
from comtypes import GUID

comtypes.client.GetModule("portabledeviceapi.dll")
import comtypes.gen.PortableDeviceApiLib as port_api
Expand All @@ -16,22 +13,10 @@
import comtypes.gen.PortableDeviceTypesLib as port_types


def newGuid(*args: int) -> comtypes.GUID:
guid = comtypes.GUID()
guid.Data1 = c_uint32(args[0])
guid.Data2 = c_uint16(args[1])
guid.Data3 = c_uint16(args[2])
for i in range(8):
guid.Data4[i] = c_int8(args[3 + i])
return guid


def PropertyKey(*args: int) -> _Pointer[port_api._tagpropertykey]:
assert len(args) == 12
assert all(isinstance(x, int) for x in args)
def PropertyKey(fmtid: GUID, pid: int) -> "_Pointer[port_api._tagpropertykey]":
propkey = port_api._tagpropertykey()
propkey.fmtid = newGuid(*args[0:11])
propkey.pid = c_ulong(args[11])
propkey.fmtid = fmtid
propkey.pid = c_ulong(pid)
return pointer(propkey)


Expand Down Expand Up @@ -63,47 +48,41 @@ def setUp(self):
self.device.Open(list(dev_ids)[0], info)

def test_EnumObjects(self):
WPD_OBJECT_NAME = PropertyKey(
0xEF6B490D, 0x5CD8, 0x437A, 0xAF, 0xFC, 0xDA, 0x8B, 0x60, 0xEE, 0x4A, 0x3C, 4)
WPD_OBJECT_CONTENT_TYPE = PropertyKey(
0xEF6B490D, 0x5CD8, 0x437A, 0xAF, 0xFC, 0xDA, 0x8B, 0x60, 0xEE, 0x4A, 0x3C, 7)
WPD_OBJECT_SIZE = PropertyKey(
0xEF6B490D, 0x5CD8, 0x437A, 0xAF, 0xFC, 0xDA, 0x8B, 0x60, 0xEE, 0x4A, 0x3C, 11)
WPD_OBJECT_ORIGINAL_FILE_NAME = PropertyKey(
0xEF6B490D, 0x5CD8, 0x437A, 0xAF, 0xFC, 0xDA, 0x8B, 0x60, 0xEE, 0x4A, 0x3C, 12)
WPD_OBJECT_PARENT_ID = PropertyKey(
0xEF6B490D, 0x5CD8, 0x437A, 0xAF, 0xFC, 0xDA, 0x8B, 0x60, 0xEE, 0x4A, 0x3C, 3)
folderType = newGuid(0x27E2E392, 0xA111, 0x48E0,
0xAB, 0x0C, 0xE1, 0x77, 0x05, 0xA0, 0x5F, 0x85)
functionalType = newGuid(
0x99ED0160, 0x17FF, 0x4C44, 0x9D, 0x98, 0x1D, 0x7A, 0x6F, 0x94, 0x19, 0x21)
WPD_OBJECT_PROPERTIES_V1 = GUID("{EF6B490D-5CD8-437A-AFFC-DA8B60EE4A3C}")
WPD_OBJECT_NAME = PropertyKey(WPD_OBJECT_PROPERTIES_V1, 4)
WPD_OBJECT_CONTENT_TYPE = PropertyKey(WPD_OBJECT_PROPERTIES_V1, 7)
WPD_OBJECT_SIZE = PropertyKey(WPD_OBJECT_PROPERTIES_V1, 11)
WPD_OBJECT_ORIGINAL_FILE_NAME = PropertyKey(WPD_OBJECT_PROPERTIES_V1, 12)
WPD_OBJECT_PARENT_ID = PropertyKey(WPD_OBJECT_PROPERTIES_V1, 3)
folderType = GUID("{27E2E392-A111-48E0-AB0C-E17705A05F85}")
functionalType = GUID("{99ED0160-17FF-4C44-9D98-1D7A6F941921}")

content = self.device.Content()
properties = content.Properties()
propertiesToRead = comtypes.client.CreateObject(
key_collection = comtypes.client.CreateObject(
port_types.PortableDeviceKeyCollection,
clsctx=comtypes.CLSCTX_INPROC_SERVER,
interface=port_api.IPortableDeviceKeyCollection,
)
propertiesToRead.Add(WPD_OBJECT_NAME)
propertiesToRead.Add(WPD_OBJECT_CONTENT_TYPE)
key_collection.Add(WPD_OBJECT_NAME)
key_collection.Add(WPD_OBJECT_CONTENT_TYPE)

def findDir(curObjectID, level: int) -> Optional[str]:
# print(contentID)
def findDir(curObjectId: str, level: int) -> Optional[str]:
"""Searches the directory given by the first parameter for a sub-directory at a given level,
and returns its object ID if it exists"""
if level < 0:
return None
values = properties.GetValues(curObjectID, propertiesToRead)
values = properties.GetValues(curObjectId, key_collection)
contenttype = values.GetGuidValue(WPD_OBJECT_CONTENT_TYPE)
is_folder = contenttype in [folderType, functionalType]
print(
f"{values.GetStringValue(WPD_OBJECT_NAME)} ({'folder' if is_folder else 'file'})")
# print(f"{values.GetStringValue(WPD_OBJECT_NAME)} ({'folder' if is_folder else 'file'})")
if is_folder:
if level == 0:
return curObjectID
return curObjectId
# traverse into the children
enumobj = content.EnumObjects(c_ulong(0), curObjectID, None)
enumobj = content.EnumObjects(c_ulong(0), curObjectId, None)
for x in enumobj:
objId = findDir(x, level-1)
objId = findDir(x, level - 1)
if objId:
return objId
return None # not in this part of the tree
Expand All @@ -113,7 +92,7 @@ def findDir(curObjectID, level: int) -> Optional[str]:
# find a directory 2 levels deep, because you can't write on the top ones
parentId = findDir("DEVICE", 2)
if not parentId:
self.fail("Could not find the parent path on the device")
self.fail("Could not find a directory two levels deep on the device")

buffer = "Text file content".encode("utf-8")
pdv = comtypes.client.CreateObject(
Expand All @@ -132,11 +111,15 @@ def findDir(curObjectID, level: int) -> Optional[str]:
optimalTransferSizeBytes,
_,
) = content.CreateObjectWithPropertiesAndData(
pdv,
optimalTransferSizeBytes,
POINTER(c_wchar_p)(),
pdv, optimalTransferSizeBytes, POINTER(c_wchar_p)()
)
# here: optional calls to fileStream.RemoteWrite(), fileStream.Commit()

# uncomment the code below to actually write the file
# from ctypes import c_ubyte
# c_buf = (c_ubyte * len(buffer)).from_buffer(bytearray(buffer))
# written = fileStream.RemoteWrite(c_buf, c_ulong(len(buffer)))
# self.assertEqual(written, len(buffer))
# fileStream.Commit(c_ulong(0))

# WARNING: It seems like this does not actually create a file unless the file stream is commited,
# though I can't promise with certainty that this is the case on all portable devices.
Expand Down

0 comments on commit 948e5e2

Please sign in to comment.