Observation#
- class genesis_forge.managers.ObservationManager(env: GenesisEnv, cfg: dict[str, ObservationConfig], name: str = 'policy', history_len: int | None = None, noise: tuple[float, float] | None = None)[source]#
Bases:
BaseManagerDefines the observations and observation space for the environment.
- Parameters:
env – The environment.
cfg – The configuration for the observation manager.
name – The name to categorize the observations under, generally used for asymmetrical RL. It’s required to have one observation manager named “policy”.
noise – The range of random noise to add to all observations.
history_len – The number of previous observations to include in the observation.
Example with ManagedEnvironment:
class MyEnv(ManagedEnvironment): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) config(self): ObservationManager( self, cfg={ "velocity_cmd": {"fn": self.velocity_command.observation}, "robot_ang_vel": { "fn": utils.entity_ang_vel, "params": {"entity": self.robot}, "noise": 0.1, }, "robot_lin_vel": { "fn": utils.entity_lin_vel, "params": {"entity": self.robot}, "noise": 0.1, }, "robot_projected_gravity": { "fn": utils.entity_projected_gravity, "params": {"entity": self.robot}, "noise": 0.1, }, "robot_dofs_position": { "fn": self.action_manager.get_dofs_position, "noise": 0.01, }, "actions": {"fn": lambda: env.actions}, }, )
Example using the observation manager directly:
class MyEnv(GenesisEnv): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.observation_manager = ObservationManager( self, cfg={ "velocity_cmd": {"fn": self.velocity_command.observation}, "robot_ang_vel": { "fn": utils.entity_ang_vel, "params": {"entity": self.robot}, "noise": 0.1, }, "robot_lin_vel": { "fn": utils.entity_lin_vel, "params": {"entity": self.robot}, "noise": 0.1, }, "robot_projected_gravity": { "fn": utils.entity_projected_gravity, "params": {"entity": self.robot}, "noise": 0.1, }, "robot_dofs_position": { "fn": self.action_manager.get_dofs_position, "noise": 0.01, }, "actions": {"fn": lambda: env.actions}, }, ) @property observation_space(self): return self.obs_manager.observation_space def build(self): super().build() self.obs_manager.build() def step(self, actions: torch.Tensor): super().step(actions) # ... step logic ... obs = self.observation_manager.observation() return obs, rewards, terminations, timeouts, info def reset(self, envs_idx: list[int] | None = None): super().reset(envs_idx) # ... reset logic ... obs = self.observation_manager.observation() return obs, info
- get_observations() torch.Tensor[source]#
Generate current observations for all environments.
- step()#
Called when the environment is stepped
- property name: str#
The name to categorize the observations under This is generally used for asymmetrical RL and it’s required to have one observation manager named “policy”.
- property observation_space: gymnasium.spaces.Space#
The observation space.