Skip to content

Commit

Permalink
Merge pull request #29 from gal20/auto_detection
Browse files Browse the repository at this point in the history
Auto-detect devices based on their LED status
  • Loading branch information
joaorb64 authored Jan 8, 2021
2 parents a1787a9 + d4b35f8 commit 501f81b
Showing 1 changed file with 111 additions and 59 deletions.
170 changes: 111 additions & 59 deletions joycond-cemuhook.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import subprocess
from termcolor import colored
from collections import OrderedDict
from os.path import basename

MAX_PADS = 4

Expand All @@ -37,7 +38,7 @@ def get_led_status(device):
udev_device = context.list_devices(subsystem='input').match_attribute('uniq', device.uniq.encode()).match_attribute('name', device.name.encode())

# should match only one device
udev_device = list(udev_device)[0]
udev_device = next(iter(udev_device))

# combined device has no parent and would match random leds in the system
if udev_device.parent is None:
Expand All @@ -50,6 +51,22 @@ def get_led_status(device):
leds[name] = status
return leds

def get_player_id(device):
player = 0
for led, status in sorted(get_led_status(device).items()):
if "player" in led:
if status == '1':
player += 1

# Should prevent reading an incorrect player id during a temporary led state in some cases
else:
break

# Combined devices don't have real leds and use evdev API instead
if not player:
player = len(device.leds())
return player

class Message(list):
Types = dict(version=bytes([0x00, 0x00, 0x10, 0x00]),
ports=bytes([0x01, 0x00, 0x10, 0x00]),
Expand Down Expand Up @@ -92,9 +109,11 @@ def __init__(self, server, device, motion_device, handle_events = True):

self.led_status = get_led_status(self.device)

if self.led_status == None or len(self.led_status) == 0:
if not self.led_status:
self.led_status = get_led_status(self.motion_device)

self.player_id = get_player_id(self.device)

self.state = {
"left_analog_x": 0x00,
"left_analog_y": 0x00,
Expand Down Expand Up @@ -198,16 +217,17 @@ def handle_motion_events(self):
self.accel_z = event.value / axis.resolution
except(OSError, RuntimeError) as e:
print("Device motion disconnected: " + self.name)
if not self.event_thread:
self.disconnect()
self.disconnect()
asyncio.get_event_loop().close()

def handle_events(self):
print_verbose("Input events thread started")
try:
asyncio.set_event_loop(asyncio.new_event_loop())

for event in self.device.read_loop():
if self.disconnected:
raise RuntimeError

if event.type == evdev.ecodes.SYN_REPORT:
self.server.report(self)
if event.type == evdev.ecodes.EV_ABS:
Expand All @@ -229,7 +249,6 @@ def handle_events(self):
self.state[ps_key] = 0xFF if event.value == 1 else 0x00
except(OSError, RuntimeError) as e:
print("Device disconnected: " + self.name)
self.disconnect()
asyncio.get_event_loop().close()

def get_battery_dbus_interface(self):
Expand Down Expand Up @@ -514,93 +533,126 @@ def report_clean(self, device):

self._res_data(i, bytes(Message('data', data)))

def add_device(self, d, motion_d, handle_devices = True):
def add_device(self, device, motion_device, handle_devices=True):
# Find an empty slot for the new device
for i, slot in enumerate(self.slots):
if not slot:
self.slots[i] = SwitchDevice(self, d, motion_d, handle_devices)
self.slots[i] = SwitchDevice(self, device, motion_device, handle_devices)
return i

# All four slots have been allocated
print("Unable to use device [" + d.name + "]: Slots full")
self.blacklisted.append(d)
return MAX_PADS

def add_devices(self, device, motion_devices, handle_devices=True):
if not motion_devices:
return

# For the first motion device, start both input thread and motion thread
self.add_device(device, motion_devices.pop(0), handle_devices)

# For additional motion devices, start only motion thread to avoid 'device busy' errors
for motion_device in motion_devices:
self.add_device(device, motion_device, False)

def handle_devices(self):
asyncio.set_event_loop(asyncio.new_event_loop())

print("Looking for Nintendo Switch controllers...")

while True:
try:
# Sort devices by name for more consistent output
evdev_devices = sorted([evdev.InputDevice(path) for path in evdev.list_devices()], key=lambda d: d.name)
evdev_devices = [evdev.InputDevice(path) for path in evdev.list_devices()]

# Filter devices that were already assigned or couldn't be assigned
evdev_devices = [d for d in evdev_devices if d not in self.blacklisted and not any(d in [dd.device, dd.motion_device] for dd in self.slots if dd)]

valid_device_names = ["Nintendo Switch Left Joy-Con",
"Nintendo Switch Right Joy-Con",
"Nintendo Switch Pro Controller",
"Nintendo Switch Combined Joy-Cons"]

# Find new devices
taken_slots = lambda: sum(d is not None for d in self.slots)

# players 1-4, 0 for invalid devices
players = {i:[] for i in range(MAX_PADS+1)}
for d in evdev_devices:
if d in self.blacklisted or d.name not in valid_device_names:
continue
players[get_player_id(d)].append(d)

# Skip devices that were already assigned a slot
elif any(d == device.device for device in self.slots if device):
active_devices = taken_slots()

del players[0]
for player, devices in sorted(players.items()):
if not devices:
continue

print("Found ["+d.name+"] - mac: "+d.uniq)
# This might happen if there are more than 4 players
# This can lead to buggy behaviour and should be blacklisted for now
elif len(devices) > 3:
print(F"More than four players detected. Ignoring player {player}.")
self.blacklisted.extend(devices)
continue

motion_d = []
# Could happen when one Joy-Con in a combined pair is disconnected and then reconnected
previously_assigned = next((slot for slot in self.slots if slot and player == slot.player_id), None)
if previously_assigned:
self.add_devices(previously_assigned.device, devices, previously_assigned.event_thread is None)
continue

# try to automagically identify correct IMU for individual Joy-Cons and Pro Controller
# combined Joy-Cons have blank uniqs and should not be assigned to any random evdev device
try:
motion_d.append(next(dd for dd in evdev_devices if dd.uniq and dd.uniq == d.uniq and dd != d))
# Lone device
if all(d.uniq == devices[0].uniq for d in devices):
devices.sort(key=lambda d: d.name in valid_device_names, reverse=True)
try:
device = next(d for d in devices if d.name in valid_device_names)

# auto-detection failed, ask user to choose motion device
except StopIteration:
print("Select motion provider(s) for ["+d.name+"]: ")
# Device is not yet 'paired'
except StopIteration:
continue
devices.remove(device)
motion_devices = devices

# Allow only Nintendo Switch motion devices
# Filter out devices that are automatically assigned or that were already assigned a slot
allowed_devices = [dd for dd in evdev_devices if ("Nintendo" in dd.name and "IMU" in dd.name
and not any(dd.uniq == device.uniq for device in evdev_devices if device != dd)
and not any(dd == device.motion_device for device in self.slots if device))]
# Paired Joy-Cons
else:
try:
device = next(d for d in devices if "Combined" in d.name)
devices.remove(device)

for i, dd in enumerate(allowed_devices):
print(F"{str(i+1)} {dd.name} - mac: {dd.uniq}")
# Added for compatibility with older versions of joycond
except StopIteration:
combined_devices = [d for d in evdev_devices if d.name == "Nintendo Switch Combined Joy-Cons"]

# Remove duplicates from user's choice to avoid assigning the same device twice
for i in list(OrderedDict.fromkeys(input("").split())):
try:
motion_d.append(allowed_devices[int(i)-1])
except (ValueError, IndexError) as e:
pass
# Sort combined devices by creation time.
# This is the best guess we have to match combined device to it's individual Joy-Cons
if len(combined_devices) > 1:
context = pyudev.Context()
combined_devices.sort(key=lambda d: next(iter(context.list_devices(sys_name=basename(d.path)))).time_since_initialized)

if not motion_d:
print("Not using motion inputs for [" + d.name + "]")
continue
device = combined_devices.pop(0)

print("Using [" + ", ".join([motion.name for motion in motion_d]) + "] as motion provider for [" + d.name + "]")
# Right Joy-Con is mapped first
motion_devices = sorted(devices, key=lambda d: "Right" in d.name, reverse=True)

# For the first motion device, start both input thread and motion thread
self.add_device(d, motion_d.pop(0), True)
if args.right_only or args.left_only:
removed = "Right" if args.left_only else "Left"
removed_device = next((d for d in motion_devices if removed in d.name), None)
if removed_device:
motion_devices.remove(removed_device)
self.blacklisted.append(removed_device)

# For additional motion devices, start only motion thread to avoid 'device busy' errors
for motion in motion_d:
self.add_device(d, motion, False)
self.add_devices(device, motion_devices)

if active_devices != taken_slots():
self.print_slots()

active_devices = taken_slots()

# Detect disconnected devices
for i, slot in enumerate(self.slots):
if slot and slot.disconnected:
self.slots[i] = None
if self.blacklisted:
self.blacklisted.pop()
self.print_slots()

if active_devices != taken_slots():
self.print_slots()

time.sleep(0.2) # sleep for 0.2 seconds to avoid 100% cpu usage
except Exception as e:
Expand Down Expand Up @@ -628,16 +680,11 @@ def print_slots(self):
device += "🎮 Pro"

leds = ""
for led, status in sorted(slot.led_status.items()):
if "player" in led:
leds += "■ " if status == '1' else "□ "

if slot.led_status:
for l in range(1, 5):
led = slot.led_status.get("player"+str(l), 0)

if led == '1':
leds += "■ "
else:
leds += "□ "
else:
if not leds:
leds = "?"

if slot.battery_level:
Expand Down Expand Up @@ -677,6 +724,11 @@ def start(self):
parser.add_argument("-v", "--verbose", help="show debug messages", action="store_true")
parser.add_argument("-ip", "--ip", help="set custom port, default is 127.0.0.1", default="127.0.0.1")
parser.add_argument("-p", "--port", help="set custom port, default is 26760", type=int, default=26760)

select_motion = parser.add_mutually_exclusive_group()
select_motion.add_argument("-l", "--left-only", help="use only left Joy-Cons for combined device motion", action="store_true")
select_motion.add_argument("-r", "--right-only", help="use only right Joy-Cons for combined device motion", action="store_true")

args = parser.parse_args()

# Check if hid_nintendo module is installed
Expand Down

0 comments on commit 501f81b

Please sign in to comment.