Observation#

class genesis_forge.managers.ObservationManager(env: GenesisEnv, cfg: dict[str, ObservationConfig], noise: tuple[float, float] | None = None)[source]#

Bases: BaseManager

Defines the observations and observation space for the environment.

Parameters:
  • env – The environment.

  • cfg – The configuration for the observation manager.

  • noise – The range of random noise to add to all observations.

Example with ManagedEnvironment:

class MyEnv(ManagedEnvironment):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    config(self):
        ObservationManager(
            self,
            cfg={
                "velocity_cmd": {"fn": self.velocity_command.observation},
                "robot_ang_vel": {
                    "fn": utils.entity_ang_vel,
                    "params": {"entity": self.robot},
                    "noise": 0.1,
                },
                "robot_lin_vel": {
                    "fn": utils.entity_lin_vel,
                    "params": {"entity": self.robot},
                    "noise": 0.1,
                },
                "robot_projected_gravity": {
                    "fn": utils.entity_projected_gravity,
                    "params": {"entity": self.robot},
                    "noise": 0.1,
                },
                "robot_dofs_position": {
                    "fn": self.action_manager.get_dofs_position,
                    "noise": 0.01,
                },
                "actions": {"fn": lambda: env.actions},
            },
        )

Example using the observation manager directly:

class MyEnv(GenesisEnv):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

        self.observation_manager = ObservationManager(
            self,
            cfg={
                "velocity_cmd": {"fn": self.velocity_command.observation},
                "robot_ang_vel": {
                    "fn": utils.entity_ang_vel,
                    "params": {"entity": self.robot},
                    "noise": 0.1,
                },
                "robot_lin_vel": {
                    "fn": utils.entity_lin_vel,
                    "params": {"entity": self.robot},
                    "noise": 0.1,
                },
                "robot_projected_gravity": {
                    "fn": utils.entity_projected_gravity,
                    "params": {"entity": self.robot},
                    "noise": 0.1,
                },
                "robot_dofs_position": {
                    "fn": self.action_manager.get_dofs_position,
                    "noise": 0.01,
                },
                "actions": {"fn": lambda: env.actions},
            },
        )

    @property
    observation_space(self):
        return self.obs_manager.observation_space

    def build(self):
        super().build()
        self.obs_manager.build()

    def step(self, actions: torch.Tensor):
        super().step(actions)

        # ... step logic ...

        obs = self.observation_manager.observation()
        return obs, rewards, terminations, timeouts, info

    def reset(self, envs_idx: list[int] | None = None):
        super().reset(envs_idx)

        # ... reset logic ...

        obs = self.observation_manager.observation()
        return obs, info
build()[source]#

Make an initial observation in order to determine the observation space

get_observations() torch.Tensor[source]#

Generate current observations for all environments.

reset(envs_idx: list[int] | None = None)#

One or more environments have been reset

step()#

Called when the environment is stepped

property observation_space: gymnasium.spaces.Space#

The observation space.