Source code for genesis_forge.gamepads.gamepad

import time
import argparse
import hid
import threading

from .config import GamepadConfig, GamepadState
from .logitech import LOGITECH_F710_CONFIG, LOGITECH_F310_CONFIG

GAMEPAD_CONFIGS = [
    LOGITECH_F710_CONFIG,
    LOGITECH_F310_CONFIG,
]


[docs] class Gamepad: """ General gamepad controller, which automatically attempts to connect to known gamepads (currentlyLogitech F710 and F310). Example:: >>> gamepad = Gamepad() >>> gamepad.state GamepadState(axis=[0.0, 0.0, 0.0, 0.0], buttons=[A], dpad=UP) >>> gamepad.state.axis [0.0, 0.0, 0.0, 0.0] >>> gamepad.state.buttons ["A"] >>> gamepad.state.dpad "UP" >>> gamepad.state.buttons = [Button.A] Example connecting to a specific gamepad: >>> gamepad = Gamepad(config=LOGITECH_F710_CONFIG) Adapted from: https://github.com/google-deepmind/mujoco_playground/blob/a873d53765a4c83572cf44fa74768ab62ceb7be1/mujoco_playground/experimental/sim2sim/gamepad_reader.py. """ def __init__( self, config: GamepadConfig = None, vendor_id=None, product_id=None, debug=False, ): self._config = config self._vendor_id = vendor_id self._product_id = product_id if vendor_id is None and config is not None: self._vendor_id = config["vendor_id"] if product_id is None and config is not None: self._product_id = config["product_id"] self._state = GamepadState() self._debug = debug self.is_running = True self._device = None self.connect() self.read_thread = threading.Thread(target=self._read_loop, daemon=True) self.read_thread.start() @property def state(self) -> GamepadState: """ The current state of the gamepad. """ return self._state
[docs] def auto_connect(self): """ Loop through the known gamepad configs until one connects. """ for config in GAMEPAD_CONFIGS: self._vendor_id = config["vendor_id"] self._product_id = config["product_id"] self._config = config try: if self.connect(): return except: pass raise IOError(f"Could not find a gamepad to connect to")
[docs] def connect(self, vendor_id=None, product_id=None): """ Attempt to connect to a gamepad. Args: vendor_id: The vendor id of the gamepad to connect to. product_id: The product id of the gamepad to connect to. Returns: True if the gamepad connected successfully, False otherwise. """ if vendor_id is None: vendor_id = self._vendor_id if product_id is None: product_id = self._product_id # If the vendor/product IDs aren't set, loop through the available gamepad configs if product_id is None and vendor_id is None: self.auto_connect() return try: self._device = hid.device() self._device.open(vendor_id, product_id) self._device.set_nonblocking(True) print( f"Connected to gamepad {self._device.get_manufacturer_string()} {self._device.get_product_string()}" ) return True except IOError as e: raise IOError( f"Error connecting to gamepad 0x{vendor_id:04x}:0x{product_id:04x}: {e}" )
[docs] def stop(self): """ Stop reading gamepad input. """ self.is_running = False
def _read_loop(self): """ Wait for gamepad input, and then update the gamepad state. """ while self.is_running: try: data = self._device.read(64) if data: try: self._state = self._parse_data(data) if self._debug: print(self._state) except Exception as e: print(f"Error parsing data: {e}") except Exception as e: print(f"Error reading from device: {e}") self._device.close() def _parse_data(self, data: list[int]) -> GamepadState: """ Parse gamepad data into a GamepadState object. Args: data: The data to parse. Returns: The parsed GamepadState object. """ axis = [] buttons = [] dpad = None # No gamepad config, so we cann't parse the data if self._config is None: return for cfg in self._config["mapping"]: if "data" not in cfg: print(f"Warning: {cfg} has no data value") continue if cfg["data"] >= len(data): print(f"Error: {cfg} data is out of range") continue value = data[cfg["data"]] value_truthy = False # Apply the bitmask to the value if "bitmask" in cfg: value = value & cfg["bitmask"] if value != 0: value_truthy = True elif "button" in cfg or "dpad" in cfg: print(f"Warning: {cfg} has no bitmask value") continue # Check if value is matches if "value" in cfg: value_truthy = value == cfg["value"] if "button" in cfg and value_truthy: buttons.append(cfg["button"].name) elif "dpad" in cfg and value_truthy: dpad = cfg["dpad"].name elif "axis" in cfg: value = -(value - 128) / 128.0 axis.insert(cfg["axis"], value) self._state.axis_values = axis self._state.buttons = buttons self._state.dpad = dpad return self._state
if __name__ == "__main__": parser = argparse.ArgumentParser( description="Test the Gamepad connection", add_help=True ) args = parser.parse_args() gamepad = Gamepad(debug=True) while True: time.sleep(1.0)