Skip to content

task_base

BaseTask

Bases: GymObservable, Registerable

Base Task class. Task-specific reset_scene, reset_agent, step methods are implemented in subclasses

Parameters:

Name Type Description Default
termination_config None or dict

Keyword-mapped configuration to use to generate termination conditions. This should be specific to the task class. Default is None, which corresponds to a default config being usd. Note that any keyword required by a specific task class but not specified in the config will automatically be filled in with the default config. See cls.default_termination_config for default values used

None
reward_config None or dict

Keyword-mapped configuration to use to generate reward functions. This should be specific to the task class. Default is None, which corresponds to a default config being usd. Note that any keyword required by a specific task class but not specified in the config will automatically be filled in with the default config. See cls.default_reward_config for default values used

None
Source code in omnigibson/tasks/task_base.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
class BaseTask(GymObservable, Registerable, metaclass=ABCMeta):
    """
    Base Task class.
    Task-specific reset_scene, reset_agent, step methods are implemented in subclasses

    Args:
        termination_config (None or dict): Keyword-mapped configuration to use to generate termination conditions. This
            should be specific to the task class. Default is None, which corresponds to a default config being usd.
            Note that any keyword required by a specific task class but not specified in the config will automatically
            be filled in with the default config. See cls.default_termination_config for default values used
        reward_config (None or dict): Keyword-mapped configuration to use to generate reward functions. This should be
            specific to the task class. Default is None, which corresponds to a default config being usd. Note that
            any keyword required by a specific task class but not specified in the config will automatically be filled
            in with the default config. See cls.default_reward_config for default values used
    """

    def __init__(self, termination_config=None, reward_config=None):
        # Make sure configs are dictionaries
        termination_config = dict() if termination_config is None else termination_config
        reward_config = dict() if reward_config is None else reward_config

        # Sanity check termination and reward conditions -- any keys found in the inputted config but NOT
        # found in the default config should raise an error
        unknown_termination_keys = set(termination_config.keys()) - set(self.default_termination_config.keys())
        assert (
            len(unknown_termination_keys) == 0
        ), f"Got unknown termination config keys inputted: {unknown_termination_keys}"
        unknown_reward_keys = set(reward_config.keys()) - set(self.default_reward_config.keys())
        assert len(unknown_reward_keys) == 0, f"Got unknown reward config keys inputted: {unknown_reward_keys}"

        # Combine with defaults and store internally
        self._termination_config = self.default_termination_config
        self._termination_config.update(termination_config)
        self._reward_config = self.default_reward_config
        self._reward_config.update(reward_config)

        # Generate reward and termination functions
        self._termination_conditions = self._create_termination_conditions()
        self._reward_functions = self._create_reward_functions()

        # Store other internal vars that will be populated at runtime
        self._loaded = False
        self._reward = None
        self._done = None
        self._success = None
        self._info = None
        self._low_dim_obs_dim = None

        # Run super init
        super().__init__()

    @abstractmethod
    def _load(self, env):
        """
        Load this task. Should be implemented by subclass. Can include functionality, e.g.: loading dynamic objects
        into the environment
        """
        raise NotImplementedError()

    @abstractmethod
    def _load_non_low_dim_observation_space(self):
        """
        Loads any non-low dim observation spaces for this task.

        Returns:
            dict: Keyword-mapped observation space for this object mapping non low dim task observation name to
                observation space
        """
        raise NotImplementedError()

    @classmethod
    def verify_scene_and_task_config(cls, scene_cfg, task_cfg):
        """
        Runs any necessary sanity checks on the scene and task configs passed; and possibly modifies them in-place

        Args:
            scene_cfg (dict): Scene configuration
            task_cfg (dict): Task configuration
        """
        # Default is no-op
        pass

    def _load_observation_space(self):
        # Create the non low dim obs space
        obs_space = self._load_non_low_dim_observation_space()

        # Create the low dim obs space and add to the main obs space dict -- make sure we're flattening low dim obs
        if self._low_dim_obs_dim > 0:
            obs_space["low_dim"] = self._build_obs_box_space(
                shape=(self._low_dim_obs_dim,), low=-float("inf"), high=float("inf"), dtype=NumpyTypes.FLOAT32
            )

        return obs_space

    def load(self, env):
        """
        Load this task
        """
        # Make sure the scene is of the correct type!
        assert any([issubclass(env.scene.__class__, valid_cls) for valid_cls in self.valid_scene_types]), (
            f"Got incompatible scene type {env.scene.__class__.__name__} for task {self.__class__.__name__}! "
            f"Scene class must be a subclass of at least one of: "
            f"{[cls_type.__name__ for cls_type in self.valid_scene_types]}"
        )

        # Run internal method
        self._load(env=env)

        # We're now initialized
        self._loaded = True

    def post_play_load(self, env):
        """
        Complete any loading tasks that require the simulator to be playing

        Args:
            env (Environment): environment instance
        """
        # Compute the low dimensional observation dimension
        obs = self.get_obs(env=env, flatten_low_dim=True)
        self._low_dim_obs_dim = len(obs["low_dim"]) if "low_dim" in obs else 0

    @property
    def task_metadata(self):
        """
        Returns:
            dict: Relevant metadata for the current task
        """
        # Default is empty dictionary
        return dict()

    def write_task_metadata(self):
        """
        Store any relevant task metadata that should be written when the simulation state is saved
        """
        # Write to sim
        og.sim.write_metadata(key="task", data=self.task_metadata)

    def load_task_metadata(self):
        """
        Load relevant task metadata stored in the simulator

        Returns:
            dict: Relevant metadata for the ucrrent task
        """
        # Load from sim
        return og.sim.get_metadata(key="task")

    @abstractmethod
    def _create_termination_conditions(self):
        """
        Creates the termination functions in the environment

        Returns:
            dict of BaseTerminationCondition: Termination functions created for this task
        """
        raise NotImplementedError()

    @abstractmethod
    def _create_reward_functions(self):
        """
        Creates the reward functions in the environment

        Returns:
            dict of BaseRewardFunction: Reward functions created for this task
        """
        raise NotImplementedError()

    def _reset_scene(self, env):
        """
        Task-specific scene reset. Default is the normal scene reset

        Args:
            env (Environment): environment instance
        """
        env.scene.reset()

    def _reset_agent(self, env):
        """
        Task-specific agent reset

        Args:
            env (Environment): environment instance
        """
        # Default is no-op
        pass

    def _reset_variables(self, env):
        """
        Task-specific internal variable reset

        Args:
            env (Environment): environment instance
        """
        # By default, reset reward, done, and info
        self._reward = None
        self._done = False
        self._success = False
        self._info = None

    def reset(self, env):
        """
        Resets this task in the environment

        Args:
            env (Environment): environment instance to reset
        """
        # Reset the scene, agent, and variables
        self._reset_scene(env)
        self._reset_agent(env)
        self._reset_variables(env)

        # Also reset all termination conditions and reward functions
        for termination_condition in self._termination_conditions.values():
            termination_condition.reset(self, env)
        for reward_function in self._reward_functions.values():
            reward_function.reset(self, env)

    def _step_termination(self, env, action, info=None):
        """
        Step and aggregate termination conditions

        Args:
            env (Environment): Environment instance
            action (n-array): 1D flattened array of actions executed by all agents in the environment
            info (None or dict): Any info to return

        Returns:
            2-tuple:
                - float: aggregated termination at the current timestep
                - dict: any information passed through this function or generated by this function
        """
        # Get all dones and successes from individual termination conditions
        dones = []
        successes = []
        info = dict() if info is None else info
        if "termination_conditions" not in info:
            info["termination_conditions"] = dict()
        for name, termination_condition in self._termination_conditions.items():
            d, s = termination_condition.step(self, env, action)
            dones.append(d)
            successes.append(s)
            info["termination_conditions"][name] = {
                "done": d,
                "success": s,
            }
        # Any True found corresponds to a done / success
        done = sum(dones) > 0
        success = sum(successes) > 0

        # Populate info
        info["success"] = success
        return done, info

    def _step_reward(self, env, action, info=None):
        """
        Step and aggregate reward functions

        Args:
            env (Environment): Environment instance
            action (n-array): 1D flattened array of actions executed by all agents in the environment
            info (None or dict): Any info to return

        Returns:
            2-tuple:
                - float: aggregated reward at the current timestep
                - dict: any information passed through this function or generated by this function
        """
        # Make sure info is a dict
        total_info = dict() if info is None else info
        # We'll also store individual reward split as well
        breakdown_dict = dict()
        # Aggregate rewards over all reward functions
        total_reward = 0.0
        for reward_name, reward_function in self._reward_functions.items():
            reward, reward_info = reward_function.step(self, env, action)
            total_reward += reward
            breakdown_dict[reward_name] = reward
            total_info[reward_name] = reward_info

        # Store breakdown dict
        total_info["reward_breakdown"] = breakdown_dict

        return total_reward, total_info

    @abstractmethod
    def _get_obs(self, env):
        """
        Get task-specific observation

        Args:
            env (Environment): Environment instance

        Returns:
            2-tuple:
                - dict: Keyword-mapped low dimensional observations from this task
                - dict: All other keyword-mapped observations from this task
        """
        raise NotImplementedError()

    def _flatten_low_dim_obs(self, obs):
        """
        Flattens dictionary containing low-dimensional observations @obs and converts it from a dictionary into a
        1D numpy array

        Args:
            obs (dict): Low-dim observation dictionary where each value is a 1D array

        Returns:
            n-array: 1D-numpy array of flattened low-dim observations
        """
        # By default, we simply concatenate all values in our obs dict
        return th.cat([ob for ob in obs.values()]) if len(obs.values()) > 0 else th.empty(0)

    def get_obs(self, env, flatten_low_dim=True):
        # Args: env (Environment): environment instance
        # Args: flatten_low_dim (bool): Whether to flatten low-dimensional observations

        # Grab obs internally
        low_dim_obs, obs = self._get_obs(env=env)

        # Possibly flatten low dim and add to main observation dictionary
        if low_dim_obs:
            obs["low_dim"] = self._flatten_low_dim_obs(obs=low_dim_obs) if flatten_low_dim else low_dim_obs

        return obs

    def step(self, env, action):
        """
        Perform task-specific step for every timestep

        Args:
            env (Environment): Environment instance
            action (n-array): 1D flattened array of actions executed by all agents in the environment

        Returns:
            3-tuple:
                - float: reward calculated after this step
                - bool: whether task is done or not
                - dict: nested dictionary of reward- and done-related info
        """
        # Make sure we're initialized
        assert self._loaded, "Task must be loaded using load() before calling step()!"

        # We calculate termination conditions first and then rewards
        # (since some rewards can rely on termination conditions to update)
        done, done_info = self._step_termination(env=env, action=action)
        reward, reward_info = self._step_reward(env=env, action=action)

        # Update the internal state of this task
        self._reward = reward
        self._done = done
        self._success = done_info["success"]
        self._info = {
            "reward": reward_info,
            "done": done_info,
        }

        return self._reward, self._done, deepcopy(self._info)

    @property
    def name(self):
        """
        Returns:
            str: Name of this task. Defaults to class name
        """
        return self.__class__.__name__

    @property
    def reward(self):
        """
        Returns:
            float: Current reward for this task
        """
        assert self._reward is not None, "At least one step() must occur before reward can be calculated!"
        return self._reward

    @property
    def done(self):
        """
        Returns:
            bool: Whether this task is done or not
        """
        assert self._done is not None, "At least one step() must occur before done can be calculated!"
        return self._done

    @property
    def success(self):
        """
        Returns:
            bool: Whether this task has succeeded or not
        """
        assert self._success is not None, "At least one step() must occur before success can be calculated!"
        return self._success

    @property
    def info(self):
        """
        Returns:
            dict: Nested dictionary of information for this task, including reward- and done-specific information
        """
        assert self._info is not None, "At least one step() must occur before info can be calculated!"
        return self._info

    @classproperty
    def valid_scene_types(cls):
        """
        Returns:
            set of Scene: Scene type(s) that are valid (i.e.: compatible) with this specific task. This will be
                used to sanity check the task + scene combination at runtime
        """
        raise NotImplementedError()

    @classproperty
    def default_reward_config(cls):
        """
        Returns:
            dict: Default reward configuration for this class. Should include any kwargs necessary for
                any of the reward classes generated in self._create_rewards(). Note: this default config
                should be fully verbose -- any keys inputted in the constructor but NOT found in this default config
                will raise an error!
        """
        raise NotImplementedError()

    @classproperty
    def default_termination_config(cls):
        """
        Returns:
            dict: Default termination configuration for this class. Should include any kwargs necessary for
                any of the termination classes generated in self._create_terminations(). Note: this default config
                should be fully verbose -- any keys inputted in the constructor but NOT found in this default config
                will raise an error!
        """
        raise NotImplementedError()

    @classproperty
    def _do_not_register_classes(cls):
        # Don't register this class since it's an abstract template
        classes = super()._do_not_register_classes
        classes.add("BaseTask")
        return classes

    @classproperty
    def _cls_registry(cls):
        # Global registry
        global REGISTERED_TASKS
        return REGISTERED_TASKS

done property

Returns:

Type Description
bool

Whether this task is done or not

info property

Returns:

Type Description
dict

Nested dictionary of information for this task, including reward- and done-specific information

name property

Returns:

Type Description
str

Name of this task. Defaults to class name

reward property

Returns:

Type Description
float

Current reward for this task

success property

Returns:

Type Description
bool

Whether this task has succeeded or not

task_metadata property

Returns:

Type Description
dict

Relevant metadata for the current task

default_reward_config()

Returns:

Type Description
dict

Default reward configuration for this class. Should include any kwargs necessary for any of the reward classes generated in self._create_rewards(). Note: this default config should be fully verbose -- any keys inputted in the constructor but NOT found in this default config will raise an error!

Source code in omnigibson/tasks/task_base.py
@classproperty
def default_reward_config(cls):
    """
    Returns:
        dict: Default reward configuration for this class. Should include any kwargs necessary for
            any of the reward classes generated in self._create_rewards(). Note: this default config
            should be fully verbose -- any keys inputted in the constructor but NOT found in this default config
            will raise an error!
    """
    raise NotImplementedError()

default_termination_config()

Returns:

Type Description
dict

Default termination configuration for this class. Should include any kwargs necessary for any of the termination classes generated in self._create_terminations(). Note: this default config should be fully verbose -- any keys inputted in the constructor but NOT found in this default config will raise an error!

Source code in omnigibson/tasks/task_base.py
@classproperty
def default_termination_config(cls):
    """
    Returns:
        dict: Default termination configuration for this class. Should include any kwargs necessary for
            any of the termination classes generated in self._create_terminations(). Note: this default config
            should be fully verbose -- any keys inputted in the constructor but NOT found in this default config
            will raise an error!
    """
    raise NotImplementedError()

load(env)

Load this task

Source code in omnigibson/tasks/task_base.py
def load(self, env):
    """
    Load this task
    """
    # Make sure the scene is of the correct type!
    assert any([issubclass(env.scene.__class__, valid_cls) for valid_cls in self.valid_scene_types]), (
        f"Got incompatible scene type {env.scene.__class__.__name__} for task {self.__class__.__name__}! "
        f"Scene class must be a subclass of at least one of: "
        f"{[cls_type.__name__ for cls_type in self.valid_scene_types]}"
    )

    # Run internal method
    self._load(env=env)

    # We're now initialized
    self._loaded = True

load_task_metadata()

Load relevant task metadata stored in the simulator

Returns:

Type Description
dict

Relevant metadata for the ucrrent task

Source code in omnigibson/tasks/task_base.py
def load_task_metadata(self):
    """
    Load relevant task metadata stored in the simulator

    Returns:
        dict: Relevant metadata for the ucrrent task
    """
    # Load from sim
    return og.sim.get_metadata(key="task")

post_play_load(env)

Complete any loading tasks that require the simulator to be playing

Parameters:

Name Type Description Default
env Environment

environment instance

required
Source code in omnigibson/tasks/task_base.py
def post_play_load(self, env):
    """
    Complete any loading tasks that require the simulator to be playing

    Args:
        env (Environment): environment instance
    """
    # Compute the low dimensional observation dimension
    obs = self.get_obs(env=env, flatten_low_dim=True)
    self._low_dim_obs_dim = len(obs["low_dim"]) if "low_dim" in obs else 0

reset(env)

Resets this task in the environment

Parameters:

Name Type Description Default
env Environment

environment instance to reset

required
Source code in omnigibson/tasks/task_base.py
def reset(self, env):
    """
    Resets this task in the environment

    Args:
        env (Environment): environment instance to reset
    """
    # Reset the scene, agent, and variables
    self._reset_scene(env)
    self._reset_agent(env)
    self._reset_variables(env)

    # Also reset all termination conditions and reward functions
    for termination_condition in self._termination_conditions.values():
        termination_condition.reset(self, env)
    for reward_function in self._reward_functions.values():
        reward_function.reset(self, env)

step(env, action)

Perform task-specific step for every timestep

Parameters:

Name Type Description Default
env Environment

Environment instance

required
action n - array

1D flattened array of actions executed by all agents in the environment

required

Returns:

Type Description
3 - tuple
  • float: reward calculated after this step
  • bool: whether task is done or not
  • dict: nested dictionary of reward- and done-related info
Source code in omnigibson/tasks/task_base.py
def step(self, env, action):
    """
    Perform task-specific step for every timestep

    Args:
        env (Environment): Environment instance
        action (n-array): 1D flattened array of actions executed by all agents in the environment

    Returns:
        3-tuple:
            - float: reward calculated after this step
            - bool: whether task is done or not
            - dict: nested dictionary of reward- and done-related info
    """
    # Make sure we're initialized
    assert self._loaded, "Task must be loaded using load() before calling step()!"

    # We calculate termination conditions first and then rewards
    # (since some rewards can rely on termination conditions to update)
    done, done_info = self._step_termination(env=env, action=action)
    reward, reward_info = self._step_reward(env=env, action=action)

    # Update the internal state of this task
    self._reward = reward
    self._done = done
    self._success = done_info["success"]
    self._info = {
        "reward": reward_info,
        "done": done_info,
    }

    return self._reward, self._done, deepcopy(self._info)

valid_scene_types()

Returns:

Type Description
set of Scene

Scene type(s) that are valid (i.e.: compatible) with this specific task. This will be used to sanity check the task + scene combination at runtime

Source code in omnigibson/tasks/task_base.py
@classproperty
def valid_scene_types(cls):
    """
    Returns:
        set of Scene: Scene type(s) that are valid (i.e.: compatible) with this specific task. This will be
            used to sanity check the task + scene combination at runtime
    """
    raise NotImplementedError()

verify_scene_and_task_config(scene_cfg, task_cfg) classmethod

Runs any necessary sanity checks on the scene and task configs passed; and possibly modifies them in-place

Parameters:

Name Type Description Default
scene_cfg dict

Scene configuration

required
task_cfg dict

Task configuration

required
Source code in omnigibson/tasks/task_base.py
@classmethod
def verify_scene_and_task_config(cls, scene_cfg, task_cfg):
    """
    Runs any necessary sanity checks on the scene and task configs passed; and possibly modifies them in-place

    Args:
        scene_cfg (dict): Scene configuration
        task_cfg (dict): Task configuration
    """
    # Default is no-op
    pass

write_task_metadata()

Store any relevant task metadata that should be written when the simulation state is saved

Source code in omnigibson/tasks/task_base.py
def write_task_metadata(self):
    """
    Store any relevant task metadata that should be written when the simulation state is saved
    """
    # Write to sim
    og.sim.write_metadata(key="task", data=self.task_metadata)