Source code for genesis_forge.managers.action.position_action_manager

from __future__ import annotations
import re
import torch
import genesis as gs
import numpy as np
from gymnasium import spaces
from typing import Any, Callable, TypeVar

from genesis_forge.genesis_env import GenesisEnv
from genesis_forge.managers.base import BaseManager
from genesis_forge.managers.action.base import BaseActionManager

T = TypeVar("T")
DofValue = dict[str, T] | T
"""Mapping of DOF name (literal or regex) to value."""


def _ensure_dof_pattern(value: DofValue) -> dict[str, Any] | None:
    """
    Ensures the value is a dictionary in the form: {<joint name or regex>: <value>}.

    Example:
        >>> ensure_dof_pattern(50)
        {".*": 50}
        >>> ensure_dof_pattern({".*": 50})
        {".*": 50}
        >>> ensure_dof_pattern({"knee_joint": 50})
        {"knee_joint": 50}

    Args:
        value: The value to convert.

    Returns:
        A dictionary of DOF name pattern to value.
    """
    if value is None:
        return None
    if isinstance(value, dict):
        return value
    return {".*": value}


[docs] class PositionActionManager(BaseActionManager): """ Converts actions to DOF positions, using affine transformations (scale and offset). .. math:: position = offset + scaling * action If `use_default_offset` is `True`, the `offset` will be set to the `default_pos` value for each DOF/joint. Args: env: The environment to manage the DOF actuators for. joint_names: The joint names to manage. default_pos: The default DOF positions. scale: How much to scale the action. offset: Offset factor for the action. use_default_offset: Whether to use default joint positions configured in the articulation asset as offset. Defaults to True. clip: Clip the action values to the range. If omitted, the action values will automatically be clipped to the joint limits. pd_kp: The PD kp values. pd_kv: The PD kv values. max_force: The max force values. damping: The damping values. stiffness: The stiffness values. frictionloss: The frictionloss values. reset_random_scale: Scale all DOF values on reset by this amount +/-. quiet_action_errors: Whether to quiet action errors. randomization_cfg: The randomization configuration used to randomize the DOF values across all environments and between resets. resample_randomization_s: The time interval to resample the randomization values. delay_step: The number of steps to delay the actions for. This is an easy way to emulate the latency in the system. Example:: class MyEnv(ManagedEnvironment): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # ...define scene and robot... def config(self): self.action_manager = PositionActionManager( self, joint_names=".*", scale=0.5, use_default_offset=True, default_pos={ # Hip joints "Leg[1-2]_Hip": -1.0, "Leg[3-4]_Hip": 1.0, # Femur joints "Leg[1-4]_Femur": 0.5, # Tibia joints "Leg[1-4]_Tibia": 0.6, }, pd_kp=50, pd_kv=0.5, max_force=8.0, ) Example using the manager directly:: class MyEnv(GenesisEnv): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # ...define scene and robot... def config(self): self.action_manager = PositionActionManager( self, joint_names=".*", action_scale=0.5, action_offset=0.0, use_default_offset=True, default_pos={ # Hip joints "Leg[1-2]_Hip": -1.0, "Leg[3-4]_Hip": 1.0, # Femur joints "Leg[1-4]_Femur": 0.5, # Tibia joints "Leg[1-4]_Tibia": 0.6, }, pd_kp=50, pd_kv=0.5, max_force=8.0, ) def build(self): super().build() config() step(self, actions: torch.Tensor) -> None: super().step(actions) self.action_manager.step(actions) # ...do other step things... reset(self, envs_idx: list[int] = None) -> None: super().reset(envs_idx) self.action_manager.reset(envs_idx) # ...do other reset things... """ def __init__( self, env: GenesisEnv, joint_names: list[str] | str = ".*", default_pos: DofValue[float] = {".*": 0.0}, scale: DofValue[float] = 1.0, clip: DofValue[tuple[float, float]] = None, offset: DofValue[float] = 0.0, use_default_offset: bool = True, pd_kp: DofValue[float] = None, pd_kv: DofValue[float] = None, max_force: DofValue[float | tuple[float, float]] = None, damping: DofValue[float] = None, stiffness: DofValue[float] = None, frictionloss: DofValue[float] = None, noise_scale: float = 0.0, action_handler: Callable[[torch.Tensor], None] = None, quiet_action_errors: bool = False, delay_step: int = 0, ): super().__init__(env, delay_step) self._has_initialized = False self._default_pos_cfg = _ensure_dof_pattern(default_pos) self._offset_cfg = _ensure_dof_pattern(offset) self._scale_cfg = _ensure_dof_pattern(scale) self._clip_cfg = _ensure_dof_pattern(clip) self._pd_kp_cfg = _ensure_dof_pattern(pd_kp) self._pd_kv_cfg = _ensure_dof_pattern(pd_kv) self._max_force_cfg = _ensure_dof_pattern(max_force) self._damping_cfg = _ensure_dof_pattern(damping) self._stiffness_cfg = _ensure_dof_pattern(stiffness) self._frictionloss_cfg = _ensure_dof_pattern(frictionloss) self._quiet_action_errors = quiet_action_errors self._enabled_dof = None self._noise_scale = noise_scale self._use_default_offset = use_default_offset self._default_dofs_pos: torch.Tensor = None self._dofs_pos_buffer: torch.Tensor = None if use_default_offset and offset != 0.0: raise ValueError("Cannot set both use_default_offset and offset") if isinstance(joint_names, str): self._joint_name_cfg = [joint_names] elif isinstance(joint_names, list): self._joint_name_cfg = joint_names else: raise TypeError(f"Invalid joint_names type: {type(joint_names)}") """ Properties """ @property def action_space(self) -> tuple[float, float]: """ Returns the actions space for the environment, based on the number of DOFs defined in this action manager. """ return spaces.Box( low=-np.inf, high=np.inf, shape=(self.num_actions,), dtype=np.float32, ) @property def num_actions(self) -> int: """ Get the number of actions. """ assert ( self._enabled_dof is not None ), "PositionalActionManager not initialized. You may need to add <PositionalActionManager>.reset() in your environment's reset method." return len(self._enabled_dof) @property def dofs_idx(self) -> list[int]: """ Get the indices of the DOF that are enabled (via joint_names). """ return list(self._enabled_dof.values()) @property def default_dofs_pos(self) -> torch.Tensor: """ Return the default DOF positions. """ return self._default_dofs_pos """ DOF Getters """
[docs] def get_dofs_position(self, noise: float = 0.0): """ Return the current position of the enabled DOFs. This is a wrapper for `RigidEntity.get_dofs_position`. Args: noise: The maximum amount of random noise to add to the position values returned. """ pos = self.env.robot.get_dofs_position(self.dofs_idx) if noise > 0.0: pos = self._add_random_noise(pos, noise) return pos
[docs] def get_dofs_velocity(self, noise: float = 0.0, clip: tuple[float, float] = None): """ Return the current velocity of the enabled DOFs. This is a wrapper for `RigidEntity.get_dofs_velocity`. Args: noise: The maximum amount of random noise to add to the velocity values returned. clip: Clip the velocity returned. """ vel = self.env.robot.get_dofs_velocity(self.dofs_idx) if noise > 0.0: vel = self._add_random_noise(vel, noise) if clip is not None: vel = vel.clamp(**clip) return vel
[docs] def get_dofs_force(self, noise: float = 0.0, clip_to_max_force: bool = False): """ Return the force experienced by the enabled DOFs. This is a wrapper for `RigidEntity.get_dofs_force`. Args: noise: The maximum amount of random noise to add to the force values returned. clip_to_max_force: Clip the force returned to the maximum force defined by the `max_force` parameter. Returns: The force experienced by the enabled DOFs. """ force = self.env.robot.get_dofs_force(self.dofs_idx) if noise > 0.0: force = self._add_random_noise(force, noise) if clip_to_max_force and self._force_range is not None: force = force.clamp(self._force_range[0], self._force_range[1]) return force
""" Operations """
[docs] def build(self): """ Builds the manager and initialized all the buffers. """ # Find all enabled joints by names/patterns self._enabled_dof = dict() for joint in self.env.robot.joints: if joint.type != gs.JOINT_TYPE.REVOLUTE: continue name = joint.name for pattern in self._joint_name_cfg: if re.match(f"^{pattern}$", name): self._enabled_dof[name] = joint.dof_start break # Default DOF positions if self._default_pos_cfg is not None: self._default_dofs_pos = self._get_dof_value_tensor(self._default_pos_cfg) else: self._default_dofs_pos = torch.zeros(self.num_actions, device=gs.device) self._default_dofs_pos = self._default_dofs_pos.unsqueeze(0).expand( self.env.num_envs, -1 ) # Get the joint limits lower_limit, upper_limit = self.env.robot.get_dofs_limit(self.dofs_idx) # Map config params to the DOF indices self._scale_values = None self._offset_values = None self._kp_values = None self._kv_values = None self._damping_values = None self._stiffness_values = None self._frictionloss_values = None self._clip_values = torch.stack([lower_limit, upper_limit], dim=1) if self._scale_cfg is not None: self._scale_values = self._get_dof_value_tensor(self._scale_cfg) if self._clip_cfg is not None: self._get_dof_value_tensor(self._clip_cfg, output=self._clip_values) if self._pd_kp_cfg is not None: self._kp_values = self._get_dof_value_tensor(self._pd_kp_cfg) if self._pd_kv_cfg is not None: self._kv_values = self._get_dof_value_tensor(self._pd_kv_cfg) if self._damping_cfg is not None: self._damping_values = self._get_dof_value_tensor(self._damping_cfg) if self._stiffness_cfg is not None: self._stiffness_values = self._get_dof_value_tensor(self._stiffness_cfg) if self._frictionloss_cfg is not None: self._frictionloss_values = self._get_dof_value_tensor( self._frictionloss_cfg ) if self._use_default_offset: self._offset_values = self._default_dofs_pos else: offset = self._offset_cfg if self._offset_cfg is not None else 0.0 self._offset_values = self._get_dof_value_tensor(offset) # Max force # The value can either be a single float or a tuple range self._force_range = None if self._max_force_cfg is not None: max_force = self._get_dof_value_array(self._max_force_cfg) # Convert values to upper and lower arrays force_upper = [0.0] * self.num_actions force_lower = [0.0] * self.num_actions for i, value in enumerate(max_force): if isinstance(max_force[0], (list, tuple)): force_lower[i] = value[0] force_upper[i] = value[1] else: force_lower[i] = -value force_upper[i] = value self._force_range = ( torch.tensor(force_lower, device=gs.device), torch.tensor(force_upper, device=gs.device), )
[docs] def step(self, actions: torch.Tensor) -> torch.Tensor: """ Take the incoming actions for this step and handle them. Args: actions: The incoming step actions to handle. """ if not self.enabled: return actions = super().step(actions) self._actions = self.handle_actions(actions) return self._actions
[docs] def handle_actions(self, actions: torch.Tensor) -> torch.Tensor: """ Converts the actions to position commands, and send them to the DOF actuators. Override this function if you want to change the action handling logic. Args: actions: The incoming step actions to handle. Returns: The processed and handled actions. """ # Validate actions if not self._quiet_action_errors: if torch.isnan(actions).any(): print(f"ERROR: NaN actions received! Actions: {actions}") if torch.isinf(actions).any(): print(f"ERROR: Infinite actions received! Actions: {actions}") # Process actions actions = actions * self._scale_values + self._offset_values actions = torch.clamp( actions, min=self._clip_values[:, 0], max=self._clip_values[:, 1], ) # Set target positions self.env.robot.control_dofs_position(actions, self.dofs_idx) return actions
[docs] def reset( self, envs_idx: list[int] = None, ): """Reset the DOF positions.""" if not self.enabled: return if envs_idx is None: envs_idx = torch.arange(self.env.num_envs, device=gs.device) # Set DOF values with random scaling if self._kp_values is not None: kp = self._add_random_noise(self._kp_values, self._noise_scale) self.env.robot.set_dofs_kp(kp, self.dofs_idx, envs_idx) if self._kv_values is not None: kv = self._add_random_noise(self._kv_values, self._noise_scale) self.env.robot.set_dofs_kv(kv, self.dofs_idx, envs_idx) if self._damping_values is not None: damping = self._add_random_noise(self._damping_values, self._noise_scale) self.env.robot.set_dofs_damping(damping, self.dofs_idx, envs_idx) if self._stiffness_values is not None: stiffness = self._add_random_noise( self._stiffness_values, self._noise_scale ) self.env.robot.set_dofs_stiffness(stiffness, self.dofs_idx, envs_idx) if self._frictionloss_values is not None: frictionloss = self._add_random_noise( self._frictionloss_values, self._noise_scale ) self.env.robot.set_dofs_frictionloss(frictionloss, self.dofs_idx, envs_idx) if self._force_range is not None: lower = self._add_random_noise(self._force_range[0], self._noise_scale) upper = self._add_random_noise(self._force_range[1], self._noise_scale) self.env.robot.set_dofs_force_range(lower, upper, self.dofs_idx, envs_idx) # Reset DOF positions with random scaling position = self._add_random_noise( self._default_dofs_pos[envs_idx], self._noise_scale ) self.env.robot.set_dofs_position( position=position, dofs_idx_local=self.dofs_idx, envs_idx=envs_idx, )
""" Implementation """ def _get_dof_value_array( self, values: DofValue[T], default_value: T = 0.0, output: torch.Tensor | list[Any] | None = None, ) -> torch.Tensor | list[Any]: """ Given a DofValue dict, loop over the entries, and set the value to the DOF indices (from dofs_idx) that match the pattern. Args: values: The DOF value to convert (for example: `{".*": 50}`). Returns: A list of values for the DOF indices. For example, for 4 DOFs: [50, 50, 50, 50] """ is_set = [False] * self.num_actions if output is None: output = [default_value] * self.num_actions for pattern, value in values.items(): found = False for i, name in enumerate(self._enabled_dof.keys()): if not is_set[i] and re.match(f"^{pattern}$", name): if isinstance(output, torch.Tensor) and not isinstance( value, torch.Tensor ): value = torch.tensor(value, device=gs.device) is_set[i] = True output[i] = value found = True if not found: raise RuntimeError(f"Joint DOF '{pattern}' not found.") return output def _get_dof_value_tensor( self, values: DofValue[T], default_value: T = 0.0, output: torch.Tensor | list[Any] | None = None, ) -> torch.Tensor: """ Wrapper for _get_dof_value_array that returns a tensor. """ values = self._get_dof_value_array(values, default_value, output) return torch.tensor(values, device=gs.device, dtype=gs.tc_float) def _add_random_noise( self, values: torch.Tensor, noise_scale: float = 0.0 ) -> torch.Tensor: """ Add random noise to the tensor values """ if noise_scale == 0.0: return values noise_value = torch.empty_like(values).uniform_(-1, 1) * noise_scale return values + noise_value