Skip to content

behavior_task

BehaviorTask

Bases: BaseTask

Task for BEHAVIOR

Parameters:

Name Type Description Default
activity_name None or str

Name of the Behavior Task to instantiate

None
activity_definition_id int

Specification to load for the desired task. For a given Behavior Task, multiple task specifications can be used (i.e.: differing goal conditions, or "ways" to complete a given task). This ID determines which specification to use

0
activity_instance_id int

Specific pre-configured instance of a scene to load for this BehaviorTask. This will be used only if @online_object_sampling is False.

0
predefined_problem None or str

If specified, specifies the raw string definition of the Behavior Task to load. This will automatically override @activity_name and @activity_definition_id.

None
online_object_sampling bool

whether to sample object locations online at runtime or not

False
highlight_task_relevant_objects bool

whether to overlay task-relevant objects in the scene with a colored mask

False
termination_config None or dict

Keyword-mapped configuration to use to generate termination conditions. This should be specific to the task class. Default is None, which corresponds to a default config being usd. Note that any keyword required by a specific task class but not specified in the config will automatically be filled in with the default config. See cls.default_termination_config for default values used

None
reward_config None or dict

Keyword-mapped configuration to use to generate reward functions. This should be specific to the task class. Default is None, which corresponds to a default config being usd. Note that any keyword required by a specific task class but not specified in the config will automatically be filled in with the default config. See cls.default_reward_config for default values used

None
Source code in omnigibson/tasks/behavior_task.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
class BehaviorTask(BaseTask):
    """
    Task for BEHAVIOR

    Args:
        activity_name (None or str): Name of the Behavior Task to instantiate
        activity_definition_id (int): Specification to load for the desired task. For a given Behavior Task, multiple task
            specifications can be used (i.e.: differing goal conditions, or "ways" to complete a given task). This
            ID determines which specification to use
        activity_instance_id (int): Specific pre-configured instance of a scene to load for this BehaviorTask. This
            will be used only if @online_object_sampling is False.
        predefined_problem (None or str): If specified, specifies the raw string definition of the Behavior Task to
            load. This will automatically override @activity_name and @activity_definition_id.
        online_object_sampling (bool): whether to sample object locations online at runtime or not
        highlight_task_relevant_objects (bool): whether to overlay task-relevant objects in the scene with a colored mask
        termination_config (None or dict): Keyword-mapped configuration to use to generate termination conditions. This
            should be specific to the task class. Default is None, which corresponds to a default config being usd.
            Note that any keyword required by a specific task class but not specified in the config will automatically
            be filled in with the default config. See cls.default_termination_config for default values used
        reward_config (None or dict): Keyword-mapped configuration to use to generate reward functions. This should be
            specific to the task class. Default is None, which corresponds to a default config being usd. Note that
            any keyword required by a specific task class but not specified in the config will automatically be filled
            in with the default config. See cls.default_reward_config for default values used
    """

    def __init__(
        self,
        activity_name=None,
        activity_definition_id=0,
        activity_instance_id=0,
        predefined_problem=None,
        online_object_sampling=False,
        highlight_task_relevant_objects=False,
        termination_config=None,
        reward_config=None,
    ):
        # Make sure object states are enabled
        assert gm.ENABLE_OBJECT_STATES, "Must set gm.ENABLE_OBJECT_STATES=True in order to use BehaviorTask!"

        # Make sure task name is valid if not specifying a predefined problem
        if predefined_problem is None:
            assert (
                activity_name is not None
            ), "Activity name must be specified if no predefined_problem is specified for BehaviorTask!"
            assert_valid_key(key=activity_name, valid_keys=BEHAVIOR_ACTIVITIES, name="Behavior Task")
        else:
            # Infer activity name
            activity_name = predefined_problem.split("problem ")[-1].split("-")[0]

        # Initialize relevant variables

        # BDDL
        self.backend = OmniGibsonBDDLBackend()

        # Activity info
        self.activity_name = None
        self.activity_definition_id = activity_definition_id
        self.activity_instance_id = activity_instance_id
        self.activity_conditions = None
        self.activity_initial_conditions = None
        self.activity_goal_conditions = None
        self.ground_goal_state_options = None
        self.feedback = None  # None or str
        self.sampler = None  # BDDLSampler

        # Scene info
        self.scene_name = None

        # Object info
        self.online_object_sampling = online_object_sampling  # bool
        self.highlight_task_relevant_objs = highlight_task_relevant_objects  # bool
        self.object_scope = None  # Maps str to BDDLEntity
        self.object_instance_to_category = None  # Maps str to str
        self.future_obj_instances = None  # set of str

        # Info for demonstration collection
        self.instruction_order = None  # th.tensor of int
        self.currently_viewed_index = None  # int
        self.currently_viewed_instruction = None  # tuple of str
        self.activity_natural_language_goal_conditions = None  # str

        # Load the initial behavior configuration
        self.update_activity(
            activity_name=activity_name,
            activity_definition_id=activity_definition_id,
            predefined_problem=predefined_problem,
        )

        # Run super init
        super().__init__(termination_config=termination_config, reward_config=reward_config)

    @classmethod
    def get_cached_activity_scene_filename(
        cls, scene_model, activity_name, activity_definition_id, activity_instance_id
    ):
        """
        Helper method to programmatically construct the scene filename for a given pre-cached task configuration

        Args:
            scene_model (str): Name of the scene (e.g.: Rs_int)
            activity_name (str): Name of the task activity (e.g.: putting_away_halloween_decorations)
            activity_definition_id (int): ID of the task definition
            activity_instance_id (int): ID of the task instance

        Returns:
            str: Filename which, if exists, should include the cached activity scene
        """
        return f"{scene_model}_task_{activity_name}_{activity_definition_id}_{activity_instance_id}_template"

    @classmethod
    def verify_scene_and_task_config(cls, scene_cfg, task_cfg):
        # Run super first
        super().verify_scene_and_task_config(scene_cfg=scene_cfg, task_cfg=task_cfg)

        # Possibly modify the scene to load if we're using online_object_sampling
        scene_instance, scene_file = scene_cfg["scene_instance"], scene_cfg["scene_file"]
        activity_name = (
            task_cfg["predefined_problem"].split("problem ")[-1].split("-")[0]
            if task_cfg.get("predefined_problem", None) is not None
            else task_cfg["activity_name"]
        )
        if scene_file is None and scene_instance is None and not task_cfg["online_object_sampling"]:
            scene_instance = cls.get_cached_activity_scene_filename(
                scene_model=scene_cfg.get("scene_model", "Scene"),
                activity_name=activity_name,
                activity_definition_id=task_cfg.get("activity_definition_id", 0),
                activity_instance_id=task_cfg.get("activity_instance_id", 0),
            )
            # Update the value in the scene config
            scene_cfg["scene_instance"] = scene_instance

    @property
    def task_metadata(self):
        # Store mapping from entity name to its corresponding BDDL instance name
        return dict(
            inst_to_name={inst: entity.name for inst, entity in self.object_scope.items() if entity.exists},
        )

    def _create_termination_conditions(self):
        # Initialize termination conditions dict and fill in with Timeout and PredicateGoal
        terminations = dict()

        terminations["timeout"] = Timeout(max_steps=self._termination_config["max_steps"])
        terminations["predicate"] = PredicateGoal(goal_fcn=lambda: self.activity_goal_conditions)

        return terminations

    def _create_reward_functions(self):
        # Initialize reward functions dict and fill in with Potential reward
        rewards = dict()

        rewards["potential"] = PotentialReward(
            potential_fcn=self.get_potential,
            r_potential=self._reward_config["r_potential"],
        )

        return rewards

    def _load(self, env):
        # Initialize the current activity
        success, self.feedback = self.initialize_activity(env=env)
        # assert success, f"Failed to initialize Behavior Activity. Feedback:\n{self.feedback}"

        # Store the scene name
        self.scene_name = env.scene.scene_model if isinstance(env.scene, TraversableScene) else None

        # Highlight any task relevant objects if requested
        if self.highlight_task_relevant_objs:
            for entity in self.object_scope.values():
                if entity.synset == "agent":
                    continue
                if not entity.is_system and entity.exists:
                    entity.highlighted = True

        # Add callbacks to handle internal processing when new systems / objects are added / removed to the scene
        callback_name = f"{self.activity_name}_refresh"
        og.sim.add_callback_on_add_obj(name=callback_name, callback=self._update_bddl_scope_from_added_obj)
        og.sim.add_callback_on_remove_obj(name=callback_name, callback=self._update_bddl_scope_from_removed_obj)

        og.sim.add_callback_on_system_init(name=callback_name, callback=self._update_bddl_scope_from_system_init)
        og.sim.add_callback_on_system_clear(name=callback_name, callback=self._update_bddl_scope_from_system_clear)

    def _load_non_low_dim_observation_space(self):
        # No non-low dim observations so we return an empty dict
        return dict()

    def update_activity(self, activity_name, activity_definition_id, predefined_problem=None):
        """
        Update the active Behavior activity being deployed

        Args:
            activity_name (None or str): Name of the Behavior Task to instantiate
            activity_definition_id (int): Specification to load for the desired task. For a given Behavior Task, multiple task
                specifications can be used (i.e.: differing goal conditions, or "ways" to complete a given task). This
                ID determines which specification to use
            predefined_problem (None or str): If specified, specifies the raw string definition of the Behavior Task to
                load. This will automatically override @activity_name and @activity_definition_id.
        """
        # Update internal variables based on values

        # Activity info
        self.activity_name = activity_name
        self.activity_definition_id = activity_definition_id
        self.activity_conditions = Conditions(
            activity_name,
            activity_definition_id,
            simulator_name="omnigibson",
            predefined_problem=predefined_problem,
        )

        # Get scope, making sure agent is the first entry
        self.object_scope = {"agent.n.01_1": None}
        self.object_scope.update(get_object_scope(self.activity_conditions))

        # Object info
        self.object_instance_to_category = {
            obj_inst: obj_cat
            for obj_cat in self.activity_conditions.parsed_objects
            for obj_inst in self.activity_conditions.parsed_objects[obj_cat]
        }

        # Generate initial and goal conditions
        self.activity_initial_conditions = get_initial_conditions(
            self.activity_conditions, self.backend, self.object_scope
        )
        self.activity_goal_conditions = get_goal_conditions(self.activity_conditions, self.backend, self.object_scope)
        self.ground_goal_state_options = get_ground_goal_state_options(
            self.activity_conditions, self.backend, self.object_scope, self.activity_goal_conditions
        )

        # Demo attributes
        self.instruction_order = th.arange(len(self.activity_conditions.parsed_goal_conditions))
        self.instruction_order = self.instruction_order[th.randperm(self.instruction_order.size(0))]

        self.currently_viewed_index = 0
        self.currently_viewed_instruction = self.instruction_order[self.currently_viewed_index]
        self.activity_natural_language_initial_conditions = get_natural_initial_conditions(self.activity_conditions)
        self.activity_natural_language_goal_conditions = get_natural_goal_conditions(self.activity_conditions)

    def get_potential(self, env):
        """
        Compute task-specific potential: distance to the goal

        Args:
            env (Environment): Current active environment instance

        Returns:
            float: Computed potential
        """
        # Evaluate the first ground goal state option as the potential
        _, satisfied_predicates = evaluate_goal_conditions(self.ground_goal_state_options[0])
        success_score = len(satisfied_predicates["satisfied"]) / (
            len(satisfied_predicates["satisfied"]) + len(satisfied_predicates["unsatisfied"])
        )
        return -success_score

    def initialize_activity(self, env):
        """
        Initializes the desired activity in the current environment @env

        Args:
            env (Environment): Current active environment instance

        Returns:
            2-tuple:
                - bool: Whether the generated scene activity should be accepted or not
                - dict: Any feedback from the sampling / initialization process
        """
        accept_scene = True
        feedback = None

        # Generate sampler
        self.sampler = BDDLSampler(
            env=env,
            activity_conditions=self.activity_conditions,
            object_scope=self.object_scope,
            backend=self.backend,
        )

        # Compose future objects
        self.future_obj_instances = {
            init_cond.body[1] for init_cond in self.activity_initial_conditions if init_cond.body[0] == "future"
        }

        if self.online_object_sampling:
            # Sample online
            accept_scene, feedback = self.sampler.sample()
            if not accept_scene:
                return accept_scene, feedback
        else:
            # Load existing scene cache and assign object scope accordingly
            self.assign_object_scope_with_cache(env)

        # Generate goal condition with the fully populated self.object_scope
        self.activity_goal_conditions = get_goal_conditions(self.activity_conditions, self.backend, self.object_scope)
        self.ground_goal_state_options = get_ground_goal_state_options(
            self.activity_conditions, self.backend, self.object_scope, self.activity_goal_conditions
        )
        return accept_scene, feedback

    def get_agent(self, env):
        """
        Grab the 0th agent from @env

        Args:
            env (Environment): Current active environment instance

        Returns:
            BaseRobot: The 0th robot from the environment instance
        """
        # We assume the relevant agent is the first agent in the scene
        return env.robots[0]

    def assign_object_scope_with_cache(self, env):
        """
        Assigns objects within the current object scope

        Args:
            env (Environment): Current active environment instance
        """
        # Load task metadata
        inst_to_name = self.load_task_metadata()["inst_to_name"]

        # Assign object_scope based on a cached scene
        for obj_inst in self.object_scope:
            if obj_inst in self.future_obj_instances:
                entity = None
            else:
                assert obj_inst in inst_to_name, (
                    f"BDDL object instance {obj_inst} should exist in cached metadata "
                    f"from loaded scene, but could not be found!"
                )
                name = inst_to_name[obj_inst]
                is_system = name in env.scene.available_systems.keys()
                entity = env.scene.get_system(name) if is_system else env.scene.object_registry("name", name)
            self.object_scope[obj_inst] = BDDLEntity(
                bddl_inst=obj_inst,
                entity=entity,
            )

    def _get_obs(self, env):
        low_dim_obs = dict()

        # Batch rpy calculations for much better efficiency
        objs_exist = {obj: obj.exists for obj in self.object_scope.values() if not obj.is_system}
        objs_rpy = T.quat2euler(
            th.stack(
                [
                    obj.states[Pose].get_value()[1] if obj_exist else th.tensor([0, 0, 0, 1.0])
                    for obj, obj_exist in objs_exist.items()
                ]
            )
        )
        objs_rpy_cos = th.cos(objs_rpy)
        objs_rpy_sin = th.sin(objs_rpy)

        # Always add agent info first
        agent = self.get_agent(env=env)

        for (obj, obj_exist), obj_rpy, obj_rpy_cos, obj_rpy_sin in zip(
            objs_exist.items(), objs_rpy, objs_rpy_cos, objs_rpy_sin
        ):

            # TODO: May need to update checking here to USDObject? Or even baseobject?
            # TODO: How to handle systems as part of obs?
            if obj_exist:
                low_dim_obs[f"{obj.bddl_inst}_real"] = th.tensor([1.0])
                low_dim_obs[f"{obj.bddl_inst}_pos"] = obj.states[Pose].get_value()[0]
                low_dim_obs[f"{obj.bddl_inst}_ori_cos"] = obj_rpy_cos
                low_dim_obs[f"{obj.bddl_inst}_ori_sin"] = obj_rpy_sin
                if obj.name != agent.name:
                    for arm in agent.arm_names:
                        grasping_object = agent.is_grasping(arm=arm, candidate_obj=obj.wrapped_obj)
                        low_dim_obs[f"{obj.bddl_inst}_in_gripper_{arm}"] = th.tensor([float(grasping_object)])
            else:
                low_dim_obs[f"{obj.bddl_inst}_real"] = th.zeros(1)
                low_dim_obs[f"{obj.bddl_inst}_pos"] = th.zeros(3)
                low_dim_obs[f"{obj.bddl_inst}_ori_cos"] = th.zeros(3)
                low_dim_obs[f"{obj.bddl_inst}_ori_sin"] = th.zeros(3)
                for arm in agent.arm_names:
                    low_dim_obs[f"{obj.bddl_inst}_in_gripper_{arm}"] = th.zeros(1)

        return low_dim_obs, dict()

    def _step_termination(self, env, action, info=None):
        # Run super first
        done, info = super()._step_termination(env=env, action=action, info=info)

        # Add additional info
        info["goal_status"] = self._termination_conditions["predicate"].goal_status

        return done, info

    def _update_bddl_scope_from_added_obj(self, obj):
        """
        Internal callback function to be called when new objects are added to the simulator to potentially update internal
        bddl object scope

        Args:
            obj (BaseObject): Newly imported object
        """
        # Iterate over all entities, and if they don't exist, check if any category matches @obj's category, and set it
        # if it does, and immediately return
        for inst, entity in self.object_scope.items():
            if not entity.exists and not entity.is_system and obj.category in set(entity.og_categories):
                entity.set_entity(entity=obj)
                return

    def _update_bddl_scope_from_removed_obj(self, obj):
        """
        Internal callback function to be called when sim._pre_remove_object() is called to potentially update internal
        bddl object scope

        Args:
            obj (BaseObject): Newly removed object
        """
        # Iterate over all entities, and if they exist, check if any name matches @obj's name, and remove it
        # if it does, and immediately return
        for entity in self.object_scope.values():
            if entity.exists and not entity.is_system and obj.name == entity.name:
                entity.clear_entity()
                return

    def _update_bddl_scope_from_system_init(self, system):
        """
        Internal callback function to be called when system.initialize() is called to potentially update internal
        bddl object scope

        Args:
            system (BaseSystem): Newly initialized system
        """
        # Iterate over all entities, and potentially match the system to the scope
        for inst, entity in self.object_scope.items():
            if not entity.exists and entity.is_system and entity.og_categories[0] == system.name:
                entity.set_entity(entity=system)
                return

    def _update_bddl_scope_from_system_clear(self, system):
        """
        Internal callback function to be called when system.clear() is called to potentially update internal
        bddl object scope

        Args:
            system (BaseSystem): Newly cleared system
        """
        # Iterate over all entities, and potentially remove the matched system from the scope
        for inst, entity in self.object_scope.items():
            if entity.exists and entity.is_system and system.name == entity.name:
                entity.clear_entity()
                return

    def show_instruction(self):
        """
        Get current instruction for user

        Returns:
            3-tuple:
                - str: Current goal condition in natural language
                - 3-tuple: (R,G,B) color to assign to text
                - list of BaseObject: Relevant objects for the current instruction
        """
        satisfied = (
            self.currently_viewed_instruction in self._termination_conditions["predicate"].goal_status["satisfied"]
        )
        natural_language_condition = self.activity_natural_language_goal_conditions[self.currently_viewed_instruction]
        objects = self.activity_goal_conditions[self.currently_viewed_instruction].get_relevant_objects()
        text_color = (
            [83.0 / 255.0, 176.0 / 255.0, 72.0 / 255.0] if satisfied else [255.0 / 255.0, 51.0 / 255.0, 51.0 / 255.0]
        )

        return natural_language_condition, text_color, objects

    def iterate_instruction(self):
        """
        Increment the instruction
        """
        self.currently_viewed_index = (self.currently_viewed_index + 1) % len(
            self.activity_conditions.parsed_goal_conditions
        )
        self.currently_viewed_instruction = self.instruction_order[self.currently_viewed_index]

    def save_task(self, path=None, override=False):
        """
        Writes the current scene configuration to a .json file

        Args:
            path (None or str): If specified, absolute fpath to the desired path to write the .json. Default is
                <gm.DATASET_PATH>/scenes/<SCENE_MODEL>/json/...>
            override (bool): Whether to override any files already found at the path to write the task .json
        """
        if path is None:
            assert self.scene_name is not None, "Scene name must be set in order to save task without specifying path"
            fname = self.get_cached_activity_scene_filename(
                scene_model=self.scene_name,
                activity_name=self.activity_name,
                activity_definition_id=self.activity_definition_id,
                activity_instance_id=self.activity_instance_id,
            )
            path = os.path.join(gm.DATASET_PATH, "scenes", self.scene_name, "json", f"{fname}.json")

        if os.path.exists(path) and not override:
            log.warning(f"Scene json already exists at {path}. Use override=True to force writing of new json.")
            return
        # Write metadata and then save
        self.write_task_metadata()
        og.sim.save(json_paths=[path])

    @property
    def name(self):
        """
        Returns:
            str: Name of this task. Defaults to class name
        """
        name_base = super().name

        # Add activity name, def id, and inst id
        return f"{name_base}_{self.activity_name}_{self.activity_definition_id}_{self.activity_instance_id}"

    @classproperty
    def valid_scene_types(cls):
        # Any scene can be used
        return {Scene}

    @classproperty
    def default_termination_config(cls):
        return {
            "max_steps": 500,
        }

    @classproperty
    def default_reward_config(cls):
        return {
            "r_potential": 1.0,
        }

name property

Returns:

Type Description
str

Name of this task. Defaults to class name

assign_object_scope_with_cache(env)

Assigns objects within the current object scope

Parameters:

Name Type Description Default
env Environment

Current active environment instance

required
Source code in omnigibson/tasks/behavior_task.py
def assign_object_scope_with_cache(self, env):
    """
    Assigns objects within the current object scope

    Args:
        env (Environment): Current active environment instance
    """
    # Load task metadata
    inst_to_name = self.load_task_metadata()["inst_to_name"]

    # Assign object_scope based on a cached scene
    for obj_inst in self.object_scope:
        if obj_inst in self.future_obj_instances:
            entity = None
        else:
            assert obj_inst in inst_to_name, (
                f"BDDL object instance {obj_inst} should exist in cached metadata "
                f"from loaded scene, but could not be found!"
            )
            name = inst_to_name[obj_inst]
            is_system = name in env.scene.available_systems.keys()
            entity = env.scene.get_system(name) if is_system else env.scene.object_registry("name", name)
        self.object_scope[obj_inst] = BDDLEntity(
            bddl_inst=obj_inst,
            entity=entity,
        )

get_agent(env)

Grab the 0th agent from @env

Parameters:

Name Type Description Default
env Environment

Current active environment instance

required

Returns:

Type Description
BaseRobot

The 0th robot from the environment instance

Source code in omnigibson/tasks/behavior_task.py
def get_agent(self, env):
    """
    Grab the 0th agent from @env

    Args:
        env (Environment): Current active environment instance

    Returns:
        BaseRobot: The 0th robot from the environment instance
    """
    # We assume the relevant agent is the first agent in the scene
    return env.robots[0]

get_cached_activity_scene_filename(scene_model, activity_name, activity_definition_id, activity_instance_id) classmethod

Helper method to programmatically construct the scene filename for a given pre-cached task configuration

Parameters:

Name Type Description Default
scene_model str

Name of the scene (e.g.: Rs_int)

required
activity_name str

Name of the task activity (e.g.: putting_away_halloween_decorations)

required
activity_definition_id int

ID of the task definition

required
activity_instance_id int

ID of the task instance

required

Returns:

Type Description
str

Filename which, if exists, should include the cached activity scene

Source code in omnigibson/tasks/behavior_task.py
@classmethod
def get_cached_activity_scene_filename(
    cls, scene_model, activity_name, activity_definition_id, activity_instance_id
):
    """
    Helper method to programmatically construct the scene filename for a given pre-cached task configuration

    Args:
        scene_model (str): Name of the scene (e.g.: Rs_int)
        activity_name (str): Name of the task activity (e.g.: putting_away_halloween_decorations)
        activity_definition_id (int): ID of the task definition
        activity_instance_id (int): ID of the task instance

    Returns:
        str: Filename which, if exists, should include the cached activity scene
    """
    return f"{scene_model}_task_{activity_name}_{activity_definition_id}_{activity_instance_id}_template"

get_potential(env)

Compute task-specific potential: distance to the goal

Parameters:

Name Type Description Default
env Environment

Current active environment instance

required

Returns:

Type Description
float

Computed potential

Source code in omnigibson/tasks/behavior_task.py
def get_potential(self, env):
    """
    Compute task-specific potential: distance to the goal

    Args:
        env (Environment): Current active environment instance

    Returns:
        float: Computed potential
    """
    # Evaluate the first ground goal state option as the potential
    _, satisfied_predicates = evaluate_goal_conditions(self.ground_goal_state_options[0])
    success_score = len(satisfied_predicates["satisfied"]) / (
        len(satisfied_predicates["satisfied"]) + len(satisfied_predicates["unsatisfied"])
    )
    return -success_score

initialize_activity(env)

Initializes the desired activity in the current environment @env

Parameters:

Name Type Description Default
env Environment

Current active environment instance

required

Returns:

Type Description
2 - tuple
  • bool: Whether the generated scene activity should be accepted or not
  • dict: Any feedback from the sampling / initialization process
Source code in omnigibson/tasks/behavior_task.py
def initialize_activity(self, env):
    """
    Initializes the desired activity in the current environment @env

    Args:
        env (Environment): Current active environment instance

    Returns:
        2-tuple:
            - bool: Whether the generated scene activity should be accepted or not
            - dict: Any feedback from the sampling / initialization process
    """
    accept_scene = True
    feedback = None

    # Generate sampler
    self.sampler = BDDLSampler(
        env=env,
        activity_conditions=self.activity_conditions,
        object_scope=self.object_scope,
        backend=self.backend,
    )

    # Compose future objects
    self.future_obj_instances = {
        init_cond.body[1] for init_cond in self.activity_initial_conditions if init_cond.body[0] == "future"
    }

    if self.online_object_sampling:
        # Sample online
        accept_scene, feedback = self.sampler.sample()
        if not accept_scene:
            return accept_scene, feedback
    else:
        # Load existing scene cache and assign object scope accordingly
        self.assign_object_scope_with_cache(env)

    # Generate goal condition with the fully populated self.object_scope
    self.activity_goal_conditions = get_goal_conditions(self.activity_conditions, self.backend, self.object_scope)
    self.ground_goal_state_options = get_ground_goal_state_options(
        self.activity_conditions, self.backend, self.object_scope, self.activity_goal_conditions
    )
    return accept_scene, feedback

iterate_instruction()

Increment the instruction

Source code in omnigibson/tasks/behavior_task.py
def iterate_instruction(self):
    """
    Increment the instruction
    """
    self.currently_viewed_index = (self.currently_viewed_index + 1) % len(
        self.activity_conditions.parsed_goal_conditions
    )
    self.currently_viewed_instruction = self.instruction_order[self.currently_viewed_index]

save_task(path=None, override=False)

Writes the current scene configuration to a .json file

Parameters:

Name Type Description Default
path None or str

If specified, absolute fpath to the desired path to write the .json. Default is /scenes//json/...>

None
override bool

Whether to override any files already found at the path to write the task .json

False
Source code in omnigibson/tasks/behavior_task.py
def save_task(self, path=None, override=False):
    """
    Writes the current scene configuration to a .json file

    Args:
        path (None or str): If specified, absolute fpath to the desired path to write the .json. Default is
            <gm.DATASET_PATH>/scenes/<SCENE_MODEL>/json/...>
        override (bool): Whether to override any files already found at the path to write the task .json
    """
    if path is None:
        assert self.scene_name is not None, "Scene name must be set in order to save task without specifying path"
        fname = self.get_cached_activity_scene_filename(
            scene_model=self.scene_name,
            activity_name=self.activity_name,
            activity_definition_id=self.activity_definition_id,
            activity_instance_id=self.activity_instance_id,
        )
        path = os.path.join(gm.DATASET_PATH, "scenes", self.scene_name, "json", f"{fname}.json")

    if os.path.exists(path) and not override:
        log.warning(f"Scene json already exists at {path}. Use override=True to force writing of new json.")
        return
    # Write metadata and then save
    self.write_task_metadata()
    og.sim.save(json_paths=[path])

show_instruction()

Get current instruction for user

Returns:

Type Description
3 - tuple
  • str: Current goal condition in natural language
  • 3-tuple: (R,G,B) color to assign to text
  • list of BaseObject: Relevant objects for the current instruction
Source code in omnigibson/tasks/behavior_task.py
def show_instruction(self):
    """
    Get current instruction for user

    Returns:
        3-tuple:
            - str: Current goal condition in natural language
            - 3-tuple: (R,G,B) color to assign to text
            - list of BaseObject: Relevant objects for the current instruction
    """
    satisfied = (
        self.currently_viewed_instruction in self._termination_conditions["predicate"].goal_status["satisfied"]
    )
    natural_language_condition = self.activity_natural_language_goal_conditions[self.currently_viewed_instruction]
    objects = self.activity_goal_conditions[self.currently_viewed_instruction].get_relevant_objects()
    text_color = (
        [83.0 / 255.0, 176.0 / 255.0, 72.0 / 255.0] if satisfied else [255.0 / 255.0, 51.0 / 255.0, 51.0 / 255.0]
    )

    return natural_language_condition, text_color, objects

update_activity(activity_name, activity_definition_id, predefined_problem=None)

Update the active Behavior activity being deployed

Parameters:

Name Type Description Default
activity_name None or str

Name of the Behavior Task to instantiate

required
activity_definition_id int

Specification to load for the desired task. For a given Behavior Task, multiple task specifications can be used (i.e.: differing goal conditions, or "ways" to complete a given task). This ID determines which specification to use

required
predefined_problem None or str

If specified, specifies the raw string definition of the Behavior Task to load. This will automatically override @activity_name and @activity_definition_id.

None
Source code in omnigibson/tasks/behavior_task.py
def update_activity(self, activity_name, activity_definition_id, predefined_problem=None):
    """
    Update the active Behavior activity being deployed

    Args:
        activity_name (None or str): Name of the Behavior Task to instantiate
        activity_definition_id (int): Specification to load for the desired task. For a given Behavior Task, multiple task
            specifications can be used (i.e.: differing goal conditions, or "ways" to complete a given task). This
            ID determines which specification to use
        predefined_problem (None or str): If specified, specifies the raw string definition of the Behavior Task to
            load. This will automatically override @activity_name and @activity_definition_id.
    """
    # Update internal variables based on values

    # Activity info
    self.activity_name = activity_name
    self.activity_definition_id = activity_definition_id
    self.activity_conditions = Conditions(
        activity_name,
        activity_definition_id,
        simulator_name="omnigibson",
        predefined_problem=predefined_problem,
    )

    # Get scope, making sure agent is the first entry
    self.object_scope = {"agent.n.01_1": None}
    self.object_scope.update(get_object_scope(self.activity_conditions))

    # Object info
    self.object_instance_to_category = {
        obj_inst: obj_cat
        for obj_cat in self.activity_conditions.parsed_objects
        for obj_inst in self.activity_conditions.parsed_objects[obj_cat]
    }

    # Generate initial and goal conditions
    self.activity_initial_conditions = get_initial_conditions(
        self.activity_conditions, self.backend, self.object_scope
    )
    self.activity_goal_conditions = get_goal_conditions(self.activity_conditions, self.backend, self.object_scope)
    self.ground_goal_state_options = get_ground_goal_state_options(
        self.activity_conditions, self.backend, self.object_scope, self.activity_goal_conditions
    )

    # Demo attributes
    self.instruction_order = th.arange(len(self.activity_conditions.parsed_goal_conditions))
    self.instruction_order = self.instruction_order[th.randperm(self.instruction_order.size(0))]

    self.currently_viewed_index = 0
    self.currently_viewed_instruction = self.instruction_order[self.currently_viewed_index]
    self.activity_natural_language_initial_conditions = get_natural_initial_conditions(self.activity_conditions)
    self.activity_natural_language_goal_conditions = get_natural_goal_conditions(self.activity_conditions)