From d4b35f83f0e3679e8a9463ee23c8f9f8650bee2d Mon Sep 17 00:00:00 2001 From: gal20 <71563441+gal20@users.noreply.github.com> Date: Thu, 7 Jan 2021 12:51:21 +0200 Subject: [PATCH] Auto-detect devices based on their LED status Using the LED status, we can determine the player index for each device, and automatically assign all slots based on that --- joycond-cemuhook.py | 170 +++++++++++++++++++++++++++++--------------- 1 file changed, 111 insertions(+), 59 deletions(-) diff --git a/joycond-cemuhook.py b/joycond-cemuhook.py index af9c0bc..b047b7a 100644 --- a/joycond-cemuhook.py +++ b/joycond-cemuhook.py @@ -13,6 +13,7 @@ import subprocess from termcolor import colored from collections import OrderedDict +from os.path import basename MAX_PADS = 4 @@ -37,7 +38,7 @@ def get_led_status(device): udev_device = context.list_devices(subsystem='input').match_attribute('uniq', device.uniq.encode()).match_attribute('name', device.name.encode()) # should match only one device - udev_device = list(udev_device)[0] + udev_device = next(iter(udev_device)) # combined device has no parent and would match random leds in the system if udev_device.parent is None: @@ -50,6 +51,22 @@ def get_led_status(device): leds[name] = status return leds +def get_player_id(device): + player = 0 + for led, status in sorted(get_led_status(device).items()): + if "player" in led: + if status == '1': + player += 1 + + # Should prevent reading an incorrect player id during a temporary led state in some cases + else: + break + + # Combined devices don't have real leds and use evdev API instead + if not player: + player = len(device.leds()) + return player + class Message(list): Types = dict(version=bytes([0x00, 0x00, 0x10, 0x00]), ports=bytes([0x01, 0x00, 0x10, 0x00]), @@ -92,9 +109,11 @@ def __init__(self, server, device, motion_device, handle_events = True): self.led_status = get_led_status(self.device) - if self.led_status == None or len(self.led_status) == 0: + if not self.led_status: self.led_status = get_led_status(self.motion_device) + self.player_id = get_player_id(self.device) + self.state = { "left_analog_x": 0x00, "left_analog_y": 0x00, @@ -198,16 +217,17 @@ def handle_motion_events(self): self.accel_z = event.value / axis.resolution except(OSError, RuntimeError) as e: print("Device motion disconnected: " + self.name) - if not self.event_thread: - self.disconnect() + self.disconnect() asyncio.get_event_loop().close() def handle_events(self): print_verbose("Input events thread started") try: asyncio.set_event_loop(asyncio.new_event_loop()) - for event in self.device.read_loop(): + if self.disconnected: + raise RuntimeError + if event.type == evdev.ecodes.SYN_REPORT: self.server.report(self) if event.type == evdev.ecodes.EV_ABS: @@ -229,7 +249,6 @@ def handle_events(self): self.state[ps_key] = 0xFF if event.value == 1 else 0x00 except(OSError, RuntimeError) as e: print("Device disconnected: " + self.name) - self.disconnect() asyncio.get_event_loop().close() def get_battery_dbus_interface(self): @@ -514,11 +533,11 @@ def report_clean(self, device): self._res_data(i, bytes(Message('data', data))) - def add_device(self, d, motion_d, handle_devices = True): + def add_device(self, device, motion_device, handle_devices=True): # Find an empty slot for the new device for i, slot in enumerate(self.slots): if not slot: - self.slots[i] = SwitchDevice(self, d, motion_d, handle_devices) + self.slots[i] = SwitchDevice(self, device, motion_device, handle_devices) return i # All four slots have been allocated @@ -526,6 +545,17 @@ def add_device(self, d, motion_d, handle_devices = True): self.blacklisted.append(d) return MAX_PADS + def add_devices(self, device, motion_devices, handle_devices=True): + if not motion_devices: + return + + # For the first motion device, start both input thread and motion thread + self.add_device(device, motion_devices.pop(0), handle_devices) + + # For additional motion devices, start only motion thread to avoid 'device busy' errors + for motion_device in motion_devices: + self.add_device(device, motion_device, False) + def handle_devices(self): asyncio.set_event_loop(asyncio.new_event_loop()) @@ -533,74 +563,96 @@ def handle_devices(self): while True: try: - # Sort devices by name for more consistent output - evdev_devices = sorted([evdev.InputDevice(path) for path in evdev.list_devices()], key=lambda d: d.name) + evdev_devices = [evdev.InputDevice(path) for path in evdev.list_devices()] + + # Filter devices that were already assigned or couldn't be assigned + evdev_devices = [d for d in evdev_devices if d not in self.blacklisted and not any(d in [dd.device, dd.motion_device] for dd in self.slots if dd)] valid_device_names = ["Nintendo Switch Left Joy-Con", "Nintendo Switch Right Joy-Con", "Nintendo Switch Pro Controller", "Nintendo Switch Combined Joy-Cons"] - # Find new devices + taken_slots = lambda: sum(d is not None for d in self.slots) + + # players 1-4, 0 for invalid devices + players = {i:[] for i in range(MAX_PADS+1)} for d in evdev_devices: - if d in self.blacklisted or d.name not in valid_device_names: - continue + players[get_player_id(d)].append(d) - # Skip devices that were already assigned a slot - elif any(d == device.device for device in self.slots if device): + active_devices = taken_slots() + + del players[0] + for player, devices in sorted(players.items()): + if not devices: continue - print("Found ["+d.name+"] - mac: "+d.uniq) + # This might happen if there are more than 4 players + # This can lead to buggy behaviour and should be blacklisted for now + elif len(devices) > 3: + print(F"More than four players detected. Ignoring player {player}.") + self.blacklisted.extend(devices) + continue - motion_d = [] + # Could happen when one Joy-Con in a combined pair is disconnected and then reconnected + previously_assigned = next((slot for slot in self.slots if slot and player == slot.player_id), None) + if previously_assigned: + self.add_devices(previously_assigned.device, devices, previously_assigned.event_thread is None) + continue - # try to automagically identify correct IMU for individual Joy-Cons and Pro Controller - # combined Joy-Cons have blank uniqs and should not be assigned to any random evdev device - try: - motion_d.append(next(dd for dd in evdev_devices if dd.uniq and dd.uniq == d.uniq and dd != d)) + # Lone device + if all(d.uniq == devices[0].uniq for d in devices): + devices.sort(key=lambda d: d.name in valid_device_names, reverse=True) + try: + device = next(d for d in devices if d.name in valid_device_names) - # auto-detection failed, ask user to choose motion device - except StopIteration: - print("Select motion provider(s) for ["+d.name+"]: ") + # Device is not yet 'paired' + except StopIteration: + continue + devices.remove(device) + motion_devices = devices - # Allow only Nintendo Switch motion devices - # Filter out devices that are automatically assigned or that were already assigned a slot - allowed_devices = [dd for dd in evdev_devices if ("Nintendo" in dd.name and "IMU" in dd.name - and not any(dd.uniq == device.uniq for device in evdev_devices if device != dd) - and not any(dd == device.motion_device for device in self.slots if device))] + # Paired Joy-Cons + else: + try: + device = next(d for d in devices if "Combined" in d.name) + devices.remove(device) - for i, dd in enumerate(allowed_devices): - print(F"{str(i+1)} {dd.name} - mac: {dd.uniq}") + # Added for compatibility with older versions of joycond + except StopIteration: + combined_devices = [d for d in evdev_devices if d.name == "Nintendo Switch Combined Joy-Cons"] - # Remove duplicates from user's choice to avoid assigning the same device twice - for i in list(OrderedDict.fromkeys(input("").split())): - try: - motion_d.append(allowed_devices[int(i)-1]) - except (ValueError, IndexError) as e: - pass + # Sort combined devices by creation time. + # This is the best guess we have to match combined device to it's individual Joy-Cons + if len(combined_devices) > 1: + context = pyudev.Context() + combined_devices.sort(key=lambda d: next(iter(context.list_devices(sys_name=basename(d.path)))).time_since_initialized) - if not motion_d: - print("Not using motion inputs for [" + d.name + "]") - continue + device = combined_devices.pop(0) - print("Using [" + ", ".join([motion.name for motion in motion_d]) + "] as motion provider for [" + d.name + "]") + # Right Joy-Con is mapped first + motion_devices = sorted(devices, key=lambda d: "Right" in d.name, reverse=True) - # For the first motion device, start both input thread and motion thread - self.add_device(d, motion_d.pop(0), True) + if args.right_only or args.left_only: + removed = "Right" if args.left_only else "Left" + removed_device = next((d for d in motion_devices if removed in d.name), None) + if removed_device: + motion_devices.remove(removed_device) + self.blacklisted.append(removed_device) - # For additional motion devices, start only motion thread to avoid 'device busy' errors - for motion in motion_d: - self.add_device(d, motion, False) + self.add_devices(device, motion_devices) + if active_devices != taken_slots(): self.print_slots() - + active_devices = taken_slots() + # Detect disconnected devices for i, slot in enumerate(self.slots): if slot and slot.disconnected: self.slots[i] = None - if self.blacklisted: - self.blacklisted.pop() - self.print_slots() + + if active_devices != taken_slots(): + self.print_slots() time.sleep(0.2) # sleep for 0.2 seconds to avoid 100% cpu usage except Exception as e: @@ -628,16 +680,11 @@ def print_slots(self): device += "🎮 Pro" leds = "" + for led, status in sorted(slot.led_status.items()): + if "player" in led: + leds += "■ " if status == '1' else "□ " - if slot.led_status: - for l in range(1, 5): - led = slot.led_status.get("player"+str(l), 0) - - if led == '1': - leds += "■ " - else: - leds += "□ " - else: + if not leds: leds = "?" if slot.battery_level: @@ -677,6 +724,11 @@ def start(self): parser.add_argument("-v", "--verbose", help="show debug messages", action="store_true") parser.add_argument("-ip", "--ip", help="set custom port, default is 127.0.0.1", default="127.0.0.1") parser.add_argument("-p", "--port", help="set custom port, default is 26760", type=int, default=26760) + +select_motion = parser.add_mutually_exclusive_group() +select_motion.add_argument("-l", "--left-only", help="use only left Joy-Cons for combined device motion", action="store_true") +select_motion.add_argument("-r", "--right-only", help="use only right Joy-Cons for combined device motion", action="store_true") + args = parser.parse_args() # Check if hid_nintendo module is installed