Observation#

class genesis_forge.managers.ObservationManager(env: GenesisEnv, cfg: dict[str, ObservationConfig], name: str = 'policy', history_len: int | None = None, noise: tuple[float, float] | None = None)[source]#

Bases: BaseManager

Defines the observations and observation space for the environment.

Parameters:
  • env – The environment.

  • cfg – The configuration for the observation manager.

  • name – The name to categorize the observations under, generally used for asymmetrical RL. It’s required to have one observation manager named “policy”.

  • noise – The range of random noise to add to all observations.

  • history_len – The number of previous observations to include in the observation.

Example with ManagedEnvironment:

class MyEnv(ManagedEnvironment):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    config(self):
        ObservationManager(
            self,
            cfg={
                "velocity_cmd": {"fn": self.velocity_command.observation},
                "robot_ang_vel": {
                    "fn": utils.entity_ang_vel,
                    "params": {"entity": self.robot},
                    "noise": 0.1,
                },
                "robot_lin_vel": {
                    "fn": utils.entity_lin_vel,
                    "params": {"entity": self.robot},
                    "noise": 0.1,
                },
                "robot_projected_gravity": {
                    "fn": utils.entity_projected_gravity,
                    "params": {"entity": self.robot},
                    "noise": 0.1,
                },
                "robot_dofs_position": {
                    "fn": self.action_manager.get_dofs_position,
                    "noise": 0.01,
                },
                "actions": {"fn": lambda: env.actions},
            },
        )

Example using the observation manager directly:

class MyEnv(GenesisEnv):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

        self.observation_manager = ObservationManager(
            self,
            cfg={
                "velocity_cmd": {"fn": self.velocity_command.observation},
                "robot_ang_vel": {
                    "fn": utils.entity_ang_vel,
                    "params": {"entity": self.robot},
                    "noise": 0.1,
                },
                "robot_lin_vel": {
                    "fn": utils.entity_lin_vel,
                    "params": {"entity": self.robot},
                    "noise": 0.1,
                },
                "robot_projected_gravity": {
                    "fn": utils.entity_projected_gravity,
                    "params": {"entity": self.robot},
                    "noise": 0.1,
                },
                "robot_dofs_position": {
                    "fn": self.action_manager.get_dofs_position,
                    "noise": 0.01,
                },
                "actions": {"fn": lambda: env.actions},
            },
        )

    @property
    observation_space(self):
        return self.obs_manager.observation_space

    def build(self):
        super().build()
        self.obs_manager.build()

    def step(self, actions: torch.Tensor):
        super().step(actions)

        # ... step logic ...

        obs = self.observation_manager.observation()
        return obs, rewards, terminations, timeouts, info

    def reset(self, envs_idx: list[int] | None = None):
        super().reset(envs_idx)

        # ... reset logic ...

        obs = self.observation_manager.observation()
        return obs, info
build()[source]#

Determine the observation space and setup the buffers.

get_observations(values: dict[str, float | torch.Tensor] | None = None) torch.Tensor[source]#

Generate current observations for all environments.

Optionally, you can provide the observation values directly as a dictionary of values, and this method will return the formatted/scaled (without noise) tensor for the policy. This is useful for manual deployments or troubleshooting.

Parameters:

values – (optional) If provided, these values will be used instead of fetching observations from the config functions. It’s expected that this dict contains a key for every observation configuration. These values will be scaled, based on the configuration, but not receive any noise. This is useful for providing observations for deployment.

Returns:

The observations for all environments.

reset(envs_idx: list[int] | None = None)#

One or more environments have been reset

step()#

Called when the environment is stepped

property name: str#

The name to categorize the observations under This is generally used for asymmetrical RL and it’s required to have one observation manager named “policy”.

property observation_space: gymnasium.spaces.Space#

The observation space.