Source code for genesis_forge.managers.command.velocity_command

import math
from typing import Tuple, TypedDict

import os
import torch
import genesis as gs

from genesis_forge.genesis_env import GenesisEnv
from genesis_forge.utils import transform_by_quat
from genesis_forge.gamepads import Gamepad

from .command_manager import CommandManager, CommandRangeValue


class VelocityCommandRange(TypedDict):
    lin_vel_x: CommandRangeValue
    lin_vel_y: CommandRangeValue
    ang_vel_z: CommandRangeValue


class VelocityDebugVisualizerConfig(TypedDict):
    """Defines the configuration for the debug visualizer."""

    envs_idx: list[int]
    """The indices of the environments to visualize. If None, all environments will be visualized."""

    arrow_offset: float
    """The vertical offset of the debug arrows from the top of the robot"""

    arrow_radius: float
    """The radius of the shaft of the debug arrows"""

    arrow_max_length: float
    """The maximum length of the debug arrows"""

    commanded_color: Tuple[float, float, float, float]
    """The color of the commanded velocity arrow"""

    actual_color: Tuple[float, float, float, float]
    """The color of the actual robot velocity arrow"""

    fps: int
    """The FPS of the debug visualization. Lower FPS means fewer frames are rendered, saving GPU memory."""


DEFAULT_VISUALIZER_CONFIG: VelocityDebugVisualizerConfig = {
    "envs_idx": None,
    "arrow_offset": 0.12,
    "arrow_radius": 0.02,
    "arrow_max_length": 0.15,
    "commanded_color": (0.0, 0.5, 0.0, 1.0),
    "actual_color": (0.0, 0.0, 0.5, 1.0),
    "fps": 30,
}


[docs] class VelocityCommandManager(CommandManager): """ Generates a velocity command from uniform distribution. The command comprises of a linear velocity in x and y direction and an angular velocity around the z-axis. IMPORTANT: The velocity commands are interpreted as robot-relative coordinates: - X-axis: Forward/backward relative to robot's current orientation - Y-axis: Left/right relative to robot's current orientation - Z-axis: Yaw rotation around robot's vertical axis :::{admonition} Debug Visualization If you set `debug_visualizer` to True, arrows will be rendered above your robot showing the commanded velocity vs the actual velocity. Arrow meanings: - GREEN: Commanded velocity (robot-relative, transformed to world coordinates for visualization) When joystick is "forward", this arrow points in the robot's forward direction - BLUE: Actual robot velocity in world coordinates Args: env: The environment to control range: The ranges of linear & angular velocities standing_probability: The probability of all velocities being zero for an environment (0.0 = never, 1.0 = always) resample_time_sec: The time interval between changing the command debug_visualizer: Enable the debug arrow visualization debug_visualizer_cfg: The configuration for the debug visualizer Example:: class MyEnv(GenesisEnv): def config(self): # Create a velocity command manager self.command_manager = VelocityCommandManager( self, visualize=True, range = { "lin_vel_x_range": (-1.0, 1.0), "lin_vel_y_range": (-1.0, 1.0), "ang_vel_z_range": (-0.5, 0.5), } ) RewardManager( self, logging_enabled=True, cfg={ "tracking_lin_vel": { "weight": 1.0, "fn": rewards.command_tracking_lin_vel, "params": { "vel_cmd_manager": self.velocity_command, }, }, "tracking_ang_vel": { "weight": 1.0, "fn": rewards.command_tracking_ang_vel, "params": { "vel_cmd_manager": self.velocity_command, }, }, # ... other rewards ... }, ) # Observations ObservationManager( self, cfg={ "velocity_cmd": {"fn": self.velocity_command.observation}, # ... other observations ... }, ) """ def __init__( self, env: GenesisEnv, range: VelocityCommandRange, resample_time_sec: float = 5.0, standing_probability: float = 0.0, debug_visualizer: bool = False, debug_visualizer_cfg: VelocityDebugVisualizerConfig = DEFAULT_VISUALIZER_CONFIG, ): super().__init__(env, range=range, resample_time_sec=resample_time_sec) self._arrow_nodes: list = [] self.standing_probability = standing_probability self.debug_visualizer = debug_visualizer self.visualizer_cfg = {**DEFAULT_VISUALIZER_CONFIG, **debug_visualizer_cfg} self.debug_envs_idx = None self._is_standing_env = torch.zeros( env.num_envs, dtype=torch.bool, device=gs.device ) """ Lifecycle Operations """
[docs] def resample_command(self, env_ids: list[int]): """ Overwrites commands for environments that should be standing still. """ super().resample_command(env_ids) if not self.enabled: return # Set standing environments rand_buffer = torch.empty(len(env_ids), device=gs.device).uniform_(0.0, 1.0) self._is_standing_env[env_ids] = rand_buffer <= self.standing_probability standing_envs_idx = self._is_standing_env.nonzero(as_tuple=False).flatten() self._command[standing_envs_idx, :] = 0.0
[docs] def build(self): """Build the velocity command manager""" super().build() self.build_debug()
[docs] def build_debug(self): """Build the debug components of the velocity command manager""" if not self.debug_visualizer or self.visualizer_cfg is None: return # Pre-allocate buffers self._arrow_pos_buffer = torch.zeros(self.env.num_envs, 3, device=gs.device) self._actual_vec_buffer = torch.zeros(self.env.num_envs, 3, device=gs.device) self._vec_3d_buffer = torch.zeros(self.env.num_envs, 3, device=gs.device) self._scene_env_offset = torch.from_numpy(self.env.scene.envs_offset).to( gs.device ) # If debug envs_idx is not set, attempt to use the vis_options rendered_envs_idx self.debug_envs_idx = self.visualizer_cfg.get("envs_idx", None) if self.debug_envs_idx is None and self.env.scene.vis_options is not None: self.debug_envs_idx = self.env.scene.vis_options.rendered_envs_idx if self.debug_envs_idx is None: self.debug_envs_idx = list[int](range(self.env.num_envs)) # Calculate the number of steps per debug render fps = self.visualizer_cfg.get("fps", 30) self._steps_per_debug_render = math.ceil(1.0 / fps / self.env.dt) # Arrow scale factor # Scales the arrow size based on the maximum target velocity range self._arrow_scale_factor = self.visualizer_cfg["arrow_max_length"] / max( *self._range["lin_vel_x"], *self._range["lin_vel_y"], *self._range["ang_vel_z"], )
[docs] def step(self): """Render the command arrows""" if not self.enabled: return super().step() self._render_arrows()
[docs] def use_gamepad( self, gamepad: Gamepad, lin_vel_y_axis: int = 0, lin_vel_x_axis: int = 1, ang_vel_z_axis: int = 2, ): """ Use a connected gamepad to control the command. Args: gamepad: The gamepad to use. lin_vel_x_axis: Map this gamepad axis index to the linear velocity in the x-direction. lin_vel_y_axis: Map this gamepad axis index to the linear velocity in the y-direction. ang_vel_z_axis: Map this gamepad axis index to the angular velocity in the z-direction. """ super().use_gamepad( gamepad, range_axis={ "lin_vel_x": lin_vel_x_axis, "lin_vel_y": lin_vel_y_axis, "ang_vel_z": ang_vel_z_axis, }, invert_axis={ "lin_vel_x": True, "lin_vel_y": True, "ang_vel_z": True, }, )
""" Internal Implementation """ def _render_arrows(self): """ Render the command arrows showing velocity commands and actual robot velocities. The commanded velocity arrow (green) shows the robot-relative velocity command transformed to world coordinates for visualization. The blue arrow is the robot's actual velocity. """ # Is the debug visualizer enabled? if not self.debug_visualizer or len(self.debug_envs_idx) == 0: return # Don't update for every step if self.env.step_count % self._steps_per_debug_render != 0: return # Remove existing arrows for arrow in self._arrow_nodes: self.env.scene.clear_debug_object(arrow) self._arrow_nodes = [] # Calculate the arrow position over the robot self._arrow_pos_buffer[:] = self.env.robot.get_pos() self._arrow_pos_buffer[:, 2] += self.visualizer_cfg["arrow_offset"] self._arrow_pos_buffer += self._scene_env_offset # Transform robot-relative velocity commands to world coordinates for visualization target_velocity = self._target_velocity_in_world_frame() # Actual robot velocity (already in world coordinates) self._actual_vec_buffer[:] = self.env.robot.get_vel() * self._arrow_scale_factor self._actual_vec_buffer[:, 2] = 0.0 for i in self.debug_envs_idx: # Target arrow (robot-relative command transformed to world coordinates for visualization) self._draw_arrow( pos=self._arrow_pos_buffer[i], vec=target_velocity[i], color=self.visualizer_cfg["commanded_color"], ) # Actual arrow self._draw_arrow( pos=self._arrow_pos_buffer[i], vec=self._actual_vec_buffer[i], color=self.visualizer_cfg["actual_color"], ) def _target_velocity_in_world_frame(self) -> torch.Tensor: """ Converts robot-relative XY velocity commands to world coordinates for visualization. This method follows the IsaacLab pattern: 1. Takes robot-relative velocity commands (base frame) 2. Transforms them to world coordinates using the robot's current orientation 3. Scales them for visualization Returns: World-frame velocity vectors scaled for visualization, shape (num_envs, 3) """ # Create 3D velocity tensor with Z component zeroed for 2D visualization self._vec_3d_buffer[:, :2] = self.command[:, :2] self._vec_3d_buffer[:, 2] = 0.0 # Transform from robot-relative (base) frame to world frame using quaternion # This is the inverse of what robot_lin_vel does robot_quat = self.env.robot.get_quat() vec_world = transform_by_quat(self._vec_3d_buffer, robot_quat) # Scale the transformed world velocity vector vec_world[:, :] *= self._arrow_scale_factor return vec_world def _draw_arrow( self, pos: torch.Tensor, vec: torch.Tensor, color: list[float], ): # If velocity is zero, don't draw the arrow if not torch.any(vec != 0.0): return try: node = self.env.scene.draw_debug_arrow( pos=pos.cpu().numpy(), vec=vec.cpu().numpy(), color=color, radius=self.visualizer_cfg["arrow_radius"], ) if node: self._arrow_nodes.append(node) except Exception as e: print(f"Error adding debug visualizing in VelocityCommandManager: {e}")