ManagedEnvironment#

class genesis_forge.managed_env.ManagedEnvironment(num_envs: int = 1, dt: float = 0.01, max_episode_length_sec: int | None = 10, max_episode_random_scaling: float = 0.0, extras_logging_key: str = 'episode')[source]#

Bases: GenesisEnv

An environment which moves a lot of the logic of the environment to manager classes. This helps to keep the environment code clean and modular.

Parameters:
  • num_envs – Number of parallel environments.

  • dt – Simulation time step.

  • max_episode_length_sec – Maximum episode length in seconds.

  • max_episode_random_scaling – Randomly scale the maximum episode length by this amount (+/-) so that not all environments reset at the same time.

  • extras_logging_key – The key used, in info/extras dict, which is returned by step and reset functions, to send data to tensorboard by the RL agent.

Example:

class MyEnv(ManagedEnvironment):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

        # ...Define scene here...

    def config(self):
        self.action_manager = PositionalActionManager(
            self,
            joint_names=".*",
            pd_kp=50,
            pd_kv=0.5,
            max_force=8.0,
            default_pos={
                # Hip joints
                "Leg[1-2]_Hip": -1.0,
                "Leg[3-4]_Hip": 1.0,
                # Femur joints
                "Leg[1-4]_Femur": 0.5,
                # Tibia joints
                "Leg[1-4]_Tibia": 0.6,
            },
        )
        self.reward_manager = RewardManager(
            self,
            term_cfg={
                "Default pose": {
                    "weight": -1.0,
                    "fn": rewards.dof_similar_to_default,
                    "params": {
                        "dof_action_manager": self.action_manager,
                    },
                },
                "Base height": {
                    "fn": mdp.rewards.base_height,
                    "params": { "target_height": 0.135 },
                    "weight": -100.0,
                },
            },
        )
        ObservationManager(
            self,
            cfg={
                "velocity_cmd": {"fn": self.velocity_command.observation},
                "robot_ang_vel": {
                    "fn": utils.entity_ang_vel,
                    "params": {"entity": self.robot},
                    "noise": 0.1,
                },
                "robot_lin_vel": {
                    "fn": utils.entity_lin_vel,
                    "params": {"entity": self.robot},
                    "noise": 0.1,
                },
                "robot_projected_gravity": {
                    "fn": utils.entity_projected_gravity,
                    "params": {"entity": self.robot},
                    "noise": 0.1,
                },
                "robot_dofs_position": {
                    "fn": self.action_manager.get_dofs_position,
                    "noise": 0.01,
                },
                "actions": {"fn": lambda: env.actions},
            },
        )
add_manager(manager_type: Literal['action', 'reward', 'termination', 'contact', 'terrain', 'entity', 'command', 'observation'], manager: BaseManager)[source]#

Adds a manager to the environment. This will automatically be called by the manager class.

Parameters:
  • manager_type – The type of manager to add.

  • manager – The manager to add.

build()[source]#

Builds the environment before the first step. The Genesis scene and all the scene entities must be added before calling this method.

close()#

Close the environment.

config()[source]#

Override this method and initialize all your managers here.

Example:

def config(self):
    EntityManager(
        self,
        entity_attr="robot",
        on_reset={
            "position": {
                "fn": reset.position,
                "params": {
                    "position": INITIAL_BODY_POSITION,
                    "quat": INITIAL_QUAT,
                },
            },
        },
    )
get_observations() torch.Tensor[source]#

Returns the current observations for this step. If you use the ObservationManager, this will be handled automatically. Otherwise, override this method to return the observations.

reset(env_ids: list[int] | None = None) tuple[torch.Tensor, dict[str, Any]][source]#

Reset one or more environments. Each of the registered managers will also be reset for those environments.

Parameters:

env_ids – The environment ids to reset. If None, all environments are reset.

Returns:

A batch of observations (if env_ids is None) and an info dictionary from the vectorized environment.

set_max_episode_length(max_episode_length_sec: int) int#

Set or change the maximum episode length.

Parameters:

max_episode_length_sec – The maximum episode length in seconds.

Returns:

The maximum episode length in steps.

step(actions: torch.Tensor) tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor, dict[str, Any]][source]#

Performs a step in all environments with the given actions.

Parameters:

actions – Batch of actions for each environment with the action_space shape.

Returns:

Batch of (observations, rewards, terminations, truncations, extras)

property action_space: torch.Tensor#

The action space, provided by the action manager, if it exists.

property actions: torch.Tensor#

The actions for each environment for this step. If you’re using an action manager, these are the actions prior to being handled by the action manager.

property extras: dict#

The extras/infos dictionary reset at the start of every step, and contains additional data about the environment during that step.

property last_actions: torch.Tensor#

The actions for for the previous step.

max_episode_length: torch.Tensor#
property max_episode_length_sec: int | None#

The max episode length, in seconds, for each environment.

property max_episode_length_steps: int | None#

The max episode length, in steps, for each environment. If episode randomization scaling is enabled, this will be the base max episode length before scaling.

property num_actions: int#

The number of actions for each environment.

property num_observations: int#

The number of observations for each environment.

property observation_space: gymnasium.spaces.Space#

The observation space for the “policy” observation manager, if it exists.

robot: RigidEntity#
scene: gs.Scene#
step_count: int#
terrain: RigidEntity#
property unwrapped#

Returns this environment, not a wrapped version of it.