Skip to content

stateful_object

FlowEmitterLayerRegistry

Registry for flow emitter layers. This is used to ensure that all flow emitters are placed on unique layers, so that they do not interfere with each other.

Source code in omnigibson/objects/stateful_object.py
class FlowEmitterLayerRegistry:
    """
    Registry for flow emitter layers. This is used to ensure that all flow emitters are placed on unique layers, so that
    they do not interfere with each other.
    """

    def __init__(self):
        self._layer = 0

    def __call__(self):
        self._layer += 1
        return self._layer

StatefulObject

Bases: BaseObject

Objects that support object states.

Source code in omnigibson/objects/stateful_object.py
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
class StatefulObject(BaseObject):
    """Objects that support object states."""

    def __init__(
        self,
        name,
        relative_prim_path=None,
        category="object",
        scale=None,
        visible=True,
        fixed_base=False,
        visual_only=False,
        kinematic_only=None,
        self_collisions=False,
        prim_type=PrimType.RIGID,
        load_config=None,
        abilities=None,
        include_default_states=True,
        **kwargs,
    ):
        """
        Args:
            name (str): Name for the object. Names need to be unique per scene
            relative_prim_path (None or str): The path relative to its scene prim for this object. If not specified, it defaults to /<name>.
            category (str): Category for the object. Defaults to "object".
            scale (None or float or 3-array): if specified, sets either the uniform (float) or x,y,z (3-array) scale
                for this object. A single number corresponds to uniform scaling along the x,y,z axes, whereas a
                3-array specifies per-axis scaling.
            visible (bool): whether to render this object or not in the stage
            fixed_base (bool): whether to fix the base of this object or not
            visual_only (bool): Whether this object should be visual only (and not collide with any other objects)
            kinematic_only (None or bool): Whether this object should be kinematic only (and not get affected by any
                collisions). If None, then this value will be set to True if @fixed_base is True and some other criteria
                are satisfied (see object_base.py post_load function), else False.
            self_collisions (bool): Whether to enable self collisions for this object
            prim_type (PrimType): Which type of prim the object is, Valid options are: {PrimType.RIGID, PrimType.CLOTH}
            load_config (None or dict): If specified, should contain keyword-mapped values that are relevant for
                loading this prim at runtime.
            abilities (None or dict): If specified, manually adds specific object states to this object. It should be
                a dict in the form of {ability: {param: value}} containing object abilities and parameters to pass to
                the object state instance constructor.
            include_default_states (bool): whether to include the default object states from @get_default_states
            kwargs (dict): Additional keyword arguments that are used for other super() calls from subclasses, allowing
                for flexible compositions of various object subclasses (e.g.: Robot is USDObject + ControllableObject).
        """
        # Values that will be filled later
        self._states = None
        self._emitters = dict()
        self._visual_states = None
        self._current_texture_state = None
        self._include_default_states = include_default_states

        # Load abilities from taxonomy if needed & possible
        if abilities is None:
            abilities = {}
            taxonomy_class = OBJECT_TAXONOMY.get_synset_from_category(category)
            if taxonomy_class is not None:
                abilities = OBJECT_TAXONOMY.get_abilities(taxonomy_class)
        assert isinstance(abilities, dict), "Object abilities must be in dictionary form."
        self._abilities = abilities

        # Run super init
        super().__init__(
            relative_prim_path=relative_prim_path,
            name=name,
            category=category,
            scale=scale,
            visible=visible,
            fixed_base=fixed_base,
            visual_only=visual_only,
            kinematic_only=kinematic_only,
            self_collisions=self_collisions,
            prim_type=prim_type,
            load_config=load_config,
            **kwargs,
        )

    def _post_load(self):
        # Run super first
        super()._post_load()

        # Prepare the object states
        self._states = {}
        self.prepare_object_states()

    def _initialize(self):
        # Run super first
        super()._initialize()

        # Initialize all states
        for state in self._states.values():
            state.initialize()

        # Check whether this object requires any visual updates
        states_set = set(self.states)
        self._visual_states = states_set & get_visual_states()

        # If we require visual updates, possibly create additional APIs
        if len(self._visual_states) > 0:
            if len(states_set & get_steam_states()) > 0:
                self._create_emitter_apis(EmitterType.STEAM)

            if len(states_set & get_fire_states()) > 0:
                self._create_emitter_apis(EmitterType.FIRE)

    def add_state(self, state):
        """
        Adds state @state with name @name to self.states.

        Args:
            state (ObjectStateBase): Object state instance to add to this object
        """
        assert self._states is not None, "Cannot add state since states have not been initialized yet!"
        assert state.__class__ not in self._states, (
            f"State {state.__class__.__name__} " f"has already been added to this object!"
        )
        self._states[state.__class__] = state

    @property
    def states(self):
        """
        Get the current states of this object.

        Returns:
            dict: Keyword-mapped states for this object
        """
        return self._states

    @property
    def abilities(self):
        """
        Returns:
            dict: Dictionary mapping ability name to ability arguments for this object
        """
        return self._abilities

    @property
    def is_active(self):
        """
        Returns:
            bool: True if this object is currently considered active -- e.g.: if this object is currently awake
        """
        return super().is_active or self in self.scene.updated_state_objects

    def state_updated(self):
        """
        Adds this object to this object's scene's updated_state_objects set -- generally called externally
        by owned object state instances when its state is updated. This is useful for tracking when this object
        has had its state updated within the last simulation step
        """
        self.scene.updated_state_objects.add(self)

    def prepare_object_states(self):
        """
        Prepare the state dictionary for an object by generating the appropriate
        object state instances.

        This uses the abilities of the object and the state dependency graph to
        find & instantiate all relevant states.
        """
        states_info = (
            {state_type: {"ability": None, "params": dict()} for state_type in get_default_states()}
            if self._include_default_states
            else dict()
        )

        # Map the state type (class) to ability name and params
        if gm.ENABLE_OBJECT_STATES:
            for ability in tuple(self._abilities.keys()):
                # First, sanity check all ability requirements
                compatible = True
                for requirement in get_requirements_for_ability(ability):
                    compatible, reason = requirement.is_compatible(obj=self)
                    if not compatible:
                        # Print out warning and pop ability
                        log.warning(
                            f"Ability '{ability}' is incompatible with obj {self.name}, "
                            f"because requirement {requirement.__name__} was not met. Reason: {reason}"
                        )
                        self._abilities.pop(ability)
                        break
                if compatible:
                    params = self._abilities[ability]
                    for state_type in get_states_for_ability(ability):
                        states_info[state_type] = {
                            "ability": ability,
                            "params": state_type.postprocess_ability_params(params, self.scene),
                        }

        # Add the dependencies into the list, too, and sort based on the dependency chain
        # Must iterate over explicit tuple since dictionary changes size mid-iteration
        for state_type in tuple(states_info.keys()):
            # Add each state's dependencies, too. Note that only required dependencies are explicitly added, but both
            # required AND optional dependencies are checked / sorted
            for dependency in state_type.get_dependencies():
                if dependency not in states_info:
                    states_info[dependency] = {"ability": None, "params": dict()}

        # Iterate over all sorted state types, generating the states in topological order.
        self._states = dict()
        for state_type in get_states_by_dependency_order(states=states_info):
            # Skip over any types that are not in our info dict -- these correspond to optional dependencies
            if state_type not in states_info:
                continue

            relevant_params = extract_class_init_kwargs_from_dict(
                cls=state_type, dic=states_info[state_type]["params"], copy=False
            )
            compatible, reason = state_type.is_compatible(obj=self, **relevant_params)
            if compatible:
                self._states[state_type] = state_type(obj=self, **relevant_params)
            else:
                log.warning(f"State {state_type.__name__} is incompatible with obj {self.name}. Reason: {reason}")
                # Remove the ability if it exists
                # Note that the object may still have some of the states related to the desired ability. In this way,
                # we guarantee that the existence of a certain ability in self.abilities means at ALL corresponding
                # object state dependencies are met by the underlying object asset
                ability = states_info[state_type]["ability"]
                if ability in self._abilities:
                    self._abilities.pop(ability)

    def _create_emitter_apis(self, emitter_type):
        """
        Create necessary prims and apis for steam effects.

        Args:
            emitter_type (EmitterType): Emitter to create
        """
        # Make sure that flow setting is enabled.
        renderer_setting = RendererSettings()
        renderer_setting.common_settings.flow_settings.enable()

        # Specify emitter config.
        emitter_config = {}
        bbox_extent_local = self.native_bbox if hasattr(self, "native_bbox") else self.aabb_extent / self.scale
        if emitter_type == EmitterType.FIRE:
            fire_at_metalink = True
            if OnFire in self.states:
                # Note whether the heat source link is explicitly set
                link = self.states[OnFire].link
                fire_at_metalink = link != self.root_link
            elif HeatSourceOrSink in self.states:
                # Only apply fire to non-root-link (i.e.: explicitly specified) heat source links
                # Otherwise, immediately return
                link = self.states[HeatSourceOrSink].link
                if link == self.root_link:
                    return
            else:
                raise ValueError("Unknown fire state")

            emitter_config["name"] = "flowEmitterSphere"
            emitter_config["type"] = "FlowEmitterSphere"
            emitter_config["position"] = (
                (0.0, 0.0, 0.0) if fire_at_metalink else (0.0, 0.0, bbox_extent_local[2] * m.FIRE_EMITTER_HEIGHT_RATIO)
            )
            emitter_config["fuel"] = 0.6
            emitter_config["coupleRateFuel"] = 1.2
            emitter_config["buoyancyPerTemp"] = 0.04
            emitter_config["burnPerTemp"] = 4
            emitter_config["gravity"] = (0, 0, -60.0)
            emitter_config["constantMask"] = 5.0
            emitter_config["attenuation"] = 0.5
        elif emitter_type == EmitterType.STEAM:
            link = self.root_link
            emitter_config["name"] = "flowEmitterBox"
            emitter_config["type"] = "FlowEmitterBox"
            emitter_config["position"] = (0.0, 0.0, bbox_extent_local[2] * m.STEAM_EMITTER_HEIGHT_RATIO)
            emitter_config["fuel"] = 1.0
            emitter_config["coupleRateFuel"] = 0.5
            emitter_config["buoyancyPerTemp"] = 0.05
            emitter_config["burnPerTemp"] = 0.5
            emitter_config["gravity"] = (0, 0, -50.0)
            emitter_config["constantMask"] = 10.0
            emitter_config["attenuation"] = 1.5
        else:
            raise ValueError("Currently, only EmitterTypes FIRE and STEAM are supported!")

        # Define prim paths.
        # The flow system is created under the root link so that it automatically updates its pose as the object moves
        flowEmitter_prim_path = f"{link.prim_path}/{emitter_config['name']}"
        flowSimulate_prim_path = f"{link.prim_path}/flowSimulate"
        flowOffscreen_prim_path = f"{link.prim_path}/flowOffscreen"
        flowRender_prim_path = f"{link.prim_path}/flowRender"

        # Define prims.
        stage = og.sim.stage
        emitter = stage.DefinePrim(flowEmitter_prim_path, emitter_config["type"])
        simulate = stage.DefinePrim(flowSimulate_prim_path, "FlowSimulate")
        offscreen = stage.DefinePrim(flowOffscreen_prim_path, "FlowOffscreen")
        renderer = stage.DefinePrim(flowRender_prim_path, "FlowRender")
        advection = stage.DefinePrim(flowSimulate_prim_path + "/advection", "FlowAdvectionCombustionParams")
        smoke = stage.DefinePrim(flowSimulate_prim_path + "/advection/smoke", "FlowAdvectionCombustionParams")
        vorticity = stage.DefinePrim(flowSimulate_prim_path + "/vorticity", "FlowVorticityParams")
        rayMarch = stage.DefinePrim(flowRender_prim_path + "/rayMarch", "FlowRayMarchParams")
        colormap = stage.DefinePrim(flowOffscreen_prim_path + "/colormap", "FlowRayMarchColormapParams")

        self._emitters[emitter_type] = emitter

        layer_number = LAYER_REGISTRY()

        # Update emitter general settings.
        emitter.CreateAttribute("enabled", lazy.pxr.Sdf.ValueTypeNames.Bool, False).Set(False)
        emitter.CreateAttribute("position", lazy.pxr.Sdf.ValueTypeNames.Float3, False).Set(emitter_config["position"])
        emitter.CreateAttribute("fuel", lazy.pxr.Sdf.ValueTypeNames.Float, False).Set(emitter_config["fuel"])
        emitter.CreateAttribute("coupleRateFuel", lazy.pxr.Sdf.ValueTypeNames.Float, False).Set(
            emitter_config["coupleRateFuel"]
        )
        emitter.CreateAttribute("coupleRateVelocity", lazy.pxr.Sdf.ValueTypeNames.Float, False).Set(2.0)
        emitter.CreateAttribute("velocity", lazy.pxr.Sdf.ValueTypeNames.Float3, False).Set((0, 0, 0))
        emitter.CreateAttribute("layer", lazy.pxr.Sdf.ValueTypeNames.Int, False).Set(layer_number)
        simulate.CreateAttribute("layer", lazy.pxr.Sdf.ValueTypeNames.Int, False).Set(layer_number)
        offscreen.CreateAttribute("layer", lazy.pxr.Sdf.ValueTypeNames.Int, False).Set(layer_number)
        renderer.CreateAttribute("layer", lazy.pxr.Sdf.ValueTypeNames.Int, False).Set(layer_number)
        advection.CreateAttribute("buoyancyPerTemp", lazy.pxr.Sdf.ValueTypeNames.Float, False).Set(
            emitter_config["buoyancyPerTemp"]
        )
        advection.CreateAttribute("burnPerTemp", lazy.pxr.Sdf.ValueTypeNames.Float, False).Set(
            emitter_config["burnPerTemp"]
        )
        advection.CreateAttribute("gravity", lazy.pxr.Sdf.ValueTypeNames.Float3, False).Set(emitter_config["gravity"])
        vorticity.CreateAttribute("constantMask", lazy.pxr.Sdf.ValueTypeNames.Float, False).Set(
            emitter_config["constantMask"]
        )
        rayMarch.CreateAttribute("attenuation", lazy.pxr.Sdf.ValueTypeNames.Float, False).Set(
            emitter_config["attenuation"]
        )

        # Update emitter unique settings.
        if emitter_type == EmitterType.FIRE:
            # Radius is in the absolute world coordinate even though the fire is under the link frame.
            # In other words, scaling the object doesn't change the fire radius.
            if fire_at_metalink:
                # TODO: get radius of heat_source_link from metadata.
                radius = 0.05
            else:
                bbox_extent_world = self.native_bbox * self.scale if hasattr(self, "native_bbox") else self.aabb_extent
                # Radius is the average x-y half-extent of the object
                radius = float(th.mean(bbox_extent_world[:2]) / 2.0)
            emitter.CreateAttribute("radius", lazy.pxr.Sdf.ValueTypeNames.Float, False).Set(radius)
            simulate.CreateAttribute("densityCellSize", lazy.pxr.Sdf.ValueTypeNames.Float, False).Set(radius * 0.2)
            smoke.CreateAttribute("fade", lazy.pxr.Sdf.ValueTypeNames.Float, False).Set(2.0)
            # Set fire colormap.
            rgbaPoints = []
            rgbaPoints.append(lazy.pxr.Gf.Vec4f(0.0154, 0.0177, 0.0154, 0.004902))
            rgbaPoints.append(lazy.pxr.Gf.Vec4f(0.03575, 0.03575, 0.03575, 0.504902))
            rgbaPoints.append(lazy.pxr.Gf.Vec4f(0.03575, 0.03575, 0.03575, 0.504902))
            rgbaPoints.append(lazy.pxr.Gf.Vec4f(1, 0.1594, 0.0134, 0.8))
            rgbaPoints.append(lazy.pxr.Gf.Vec4f(13.53, 2.99, 0.12599, 0.8))
            rgbaPoints.append(lazy.pxr.Gf.Vec4f(78, 39, 6.1, 0.7))
            colormap.CreateAttribute("rgbaPoints", lazy.pxr.Sdf.ValueTypeNames.Float4Array, False).Set(rgbaPoints)
        elif emitter_type == EmitterType.STEAM:
            emitter.CreateAttribute("halfSize", lazy.pxr.Sdf.ValueTypeNames.Float3, False).Set(
                tuple(bbox_extent_local * th.tensor(m.STEAM_EMITTER_SIZE_RATIO) / 2.0)
            )
            simulate.CreateAttribute("densityCellSize", lazy.pxr.Sdf.ValueTypeNames.Float, False).Set(
                bbox_extent_local[2].item() * m.STEAM_EMITTER_DENSITY_CELL_RATIO
            )

    def set_emitter_enabled(self, emitter_type, value):
        """
        Enable/disable the emitter prim for fire/steam effect.

        Args:
            emitter_type (EmitterType): Emitter to set
            value (bool): Value to set
        """
        if emitter_type not in self._emitters:
            return
        if value != self._emitters[emitter_type].GetAttribute("enabled").Get():
            self._emitters[emitter_type].GetAttribute("enabled").Set(value)

    def get_textures(self):
        """
        Gets prim's texture files.

        Returns:
            list of str: List of texture file paths
        """
        return [material.diffuse_texture for material in self.materials if material.diffuse_texture is not None]

    def update_visuals(self):
        """
        Update the prim's visuals (texture change, steam/fire effects, etc).
        Should be called after all the states are updated.
        """
        if len(self._visual_states) > 0:
            texture_change_states = []
            emitter_enabled = defaultdict(bool)
            for state_type in self._visual_states:
                state = self.states[state_type]
                if state_type in get_texture_change_states():
                    if state_type == Saturated:
                        for particle_system in self.scene.active_systems.values():
                            if state.get_value(particle_system):
                                texture_change_states.append(state)
                                # Only need to do this once, since soaked handles all fluid systems
                                break
                    elif state.get_value():
                        texture_change_states.append(state)
                if state_type in get_steam_states():
                    emitter_enabled[EmitterType.STEAM] |= state.get_value()
                if state_type in get_fire_states():
                    emitter_enabled[EmitterType.FIRE] |= state.get_value()

            for emitter_type in emitter_enabled:
                self.set_emitter_enabled(emitter_type, emitter_enabled[emitter_type])

            texture_change_states.sort(key=lambda s: get_texture_change_priority()[s.__class__])
            object_state = texture_change_states[-1] if len(texture_change_states) > 0 else None

            # Only update our texture change if it's a different object state than the one we already have
            if object_state != self._current_texture_state:
                self._update_texture_change(object_state)
                self._current_texture_state = object_state

    def _update_texture_change(self, object_state):
        """
        Update the texture based on the given object_state. E.g. if object_state is Frozen, update the diffuse color
        to match the frozen state. If object_state is None, update the diffuse color to the default value. It modifies
        the current albedo map by adding and scaling the values. See @self._update_albedo_value for details.

        Args:
            object_state (BooleanStateMixin or None): the object state that the diffuse color should match to
        """
        for material in self.materials:
            self._update_albedo_value(object_state, material)

    @staticmethod
    def _update_albedo_value(object_state, material):
        """
        Update the albedo value based on the given object_state. The final albedo value is
        albedo_value = diffuse_tint * (albedo_value + albedo_add)

        Args:
            object_state (BooleanStateMixin or None): the object state that the diffuse color should match to
            material (MaterialPrim): the material to use to update the albedo value
        """
        if object_state is None:
            # This restore the albedo map to its original value
            albedo_add = 0.0
            diffuse_tint = th.tensor([1.0, 1.0, 1.0])
        else:
            # Query the object state for the parameters
            albedo_add, diffuse_tint = object_state.get_texture_change_params()

        if material.is_glass:
            if not th.allclose(material.glass_color, diffuse_tint):
                material.glass_color = diffuse_tint

        else:
            if material.albedo_add != albedo_add:
                material.albedo_add = albedo_add

            if not th.allclose(material.diffuse_tint, diffuse_tint):
                material.diffuse_tint = diffuse_tint

    def remove(self):
        # Run super
        super().remove()

        # Iterate over all states and run their remove call
        for state_instance in self._states.values():
            state_instance.remove()

    def _dump_state(self):
        # Grab state from super class
        state = super()._dump_state()

        # Also add non-kinematic states
        non_kin_states = dict()
        for state_type, state_instance in self._states.items():
            if state_instance.stateful:
                non_kin_states[get_state_name(state_type)] = state_instance.dump_state(serialized=False)

        state["non_kin"] = non_kin_states

        return state

    def _load_state(self, state):
        # Call super method first
        super()._load_state(state=state)

        # Load non-kinematic states
        self.load_non_kin_state(state)

    def load_non_kin_state(self, state):
        # Load all states that are stateful
        for state_type, state_instance in self._states.items():
            state_name = get_state_name(state_type)
            if state_instance.stateful:
                if state_name in state["non_kin"]:
                    state_instance.load_state(state=state["non_kin"][state_name], serialized=False)
                else:
                    log.debug(f"Missing object state [{state_name}] in the state dump for obj {self.name}")

        # Clear cache after loading state
        self.clear_states_cache()

    def serialize(self, state):
        # Call super method first
        state_flat = super().serialize(state=state)

        # Iterate over all states and serialize them individually
        non_kin_state_flat = (
            th.cat(
                [
                    self._states[REGISTERED_OBJECT_STATES[state_name]].serialize(state_dict)
                    for state_name, state_dict in state["non_kin"].items()
                ]
            )
            if len(state["non_kin"]) > 0
            else th.empty(0)
        )

        # Combine these two arrays
        return th.cat([state_flat, non_kin_state_flat])

    def deserialize(self, state):
        # Call super method first
        state_dic, idx = super().deserialize(state=state)

        # Iterate over all states and deserialize their states if they're stateful
        non_kin_state_dic = dict()
        for state_type, state_instance in self._states.items():
            state_name = get_state_name(state_type)
            if state_instance.stateful:
                non_kin_state_dic[state_name], deserialized_items = state_instance.deserialize(state[idx:])
                idx += deserialized_items
        state_dic["non_kin"] = non_kin_state_dic

        return state_dic, idx

    def clear_states_cache(self):
        """
        Clears the internal cache from all owned states
        """
        # Check self._states just in case states have not been initialized yet.
        if not self._states:
            return
        for _, obj_state in self._states.items():
            obj_state.clear_cache()

    def set_position_orientation(
        self, position=None, orientation=None, frame: Literal["world", "parent", "scene"] = "world"
    ):
        """
        Set the position and orientation of stateful object.

        Args:
            position (None or 3-array): The position to set the object to. If None, the position is not changed.
            orientation (None or 4-array): The orientation to set the object to. If None, the orientation is not changed.
            frame (Literal): The frame in which to set the position and orientation. Defaults to world. parent frame
            set position relative to the object parent. scene frame set position relative to the scene.
        """
        super().set_position_orientation(position=position, orientation=orientation, frame=frame)
        self.clear_states_cache()

    @classproperty
    def _do_not_register_classes(cls):
        # Don't register this class since it's an abstract template
        classes = super()._do_not_register_classes
        classes.add("StatefulObject")
        return classes

abilities property

Returns:

Type Description
dict

Dictionary mapping ability name to ability arguments for this object

is_active property

Returns:

Type Description
bool

True if this object is currently considered active -- e.g.: if this object is currently awake

states property

Get the current states of this object.

Returns:

Type Description
dict

Keyword-mapped states for this object

__init__(name, relative_prim_path=None, category='object', scale=None, visible=True, fixed_base=False, visual_only=False, kinematic_only=None, self_collisions=False, prim_type=PrimType.RIGID, load_config=None, abilities=None, include_default_states=True, **kwargs)

Parameters:

Name Type Description Default
name str

Name for the object. Names need to be unique per scene

required
relative_prim_path None or str

The path relative to its scene prim for this object. If not specified, it defaults to /.

None
category str

Category for the object. Defaults to "object".

'object'
scale None or float or 3 - array

if specified, sets either the uniform (float) or x,y,z (3-array) scale for this object. A single number corresponds to uniform scaling along the x,y,z axes, whereas a 3-array specifies per-axis scaling.

None
visible bool

whether to render this object or not in the stage

True
fixed_base bool

whether to fix the base of this object or not

False
visual_only bool

Whether this object should be visual only (and not collide with any other objects)

False
kinematic_only None or bool

Whether this object should be kinematic only (and not get affected by any collisions). If None, then this value will be set to True if @fixed_base is True and some other criteria are satisfied (see object_base.py post_load function), else False.

None
self_collisions bool

Whether to enable self collisions for this object

False
prim_type PrimType

Which type of prim the object is, Valid options are: {PrimType.RIGID, PrimType.CLOTH}

RIGID
load_config None or dict

If specified, should contain keyword-mapped values that are relevant for loading this prim at runtime.

None
abilities None or dict

If specified, manually adds specific object states to this object. It should be a dict in the form of {ability: {param: value}} containing object abilities and parameters to pass to the object state instance constructor.

None
include_default_states bool

whether to include the default object states from @get_default_states

True
kwargs dict

Additional keyword arguments that are used for other super() calls from subclasses, allowing for flexible compositions of various object subclasses (e.g.: Robot is USDObject + ControllableObject).

{}
Source code in omnigibson/objects/stateful_object.py
def __init__(
    self,
    name,
    relative_prim_path=None,
    category="object",
    scale=None,
    visible=True,
    fixed_base=False,
    visual_only=False,
    kinematic_only=None,
    self_collisions=False,
    prim_type=PrimType.RIGID,
    load_config=None,
    abilities=None,
    include_default_states=True,
    **kwargs,
):
    """
    Args:
        name (str): Name for the object. Names need to be unique per scene
        relative_prim_path (None or str): The path relative to its scene prim for this object. If not specified, it defaults to /<name>.
        category (str): Category for the object. Defaults to "object".
        scale (None or float or 3-array): if specified, sets either the uniform (float) or x,y,z (3-array) scale
            for this object. A single number corresponds to uniform scaling along the x,y,z axes, whereas a
            3-array specifies per-axis scaling.
        visible (bool): whether to render this object or not in the stage
        fixed_base (bool): whether to fix the base of this object or not
        visual_only (bool): Whether this object should be visual only (and not collide with any other objects)
        kinematic_only (None or bool): Whether this object should be kinematic only (and not get affected by any
            collisions). If None, then this value will be set to True if @fixed_base is True and some other criteria
            are satisfied (see object_base.py post_load function), else False.
        self_collisions (bool): Whether to enable self collisions for this object
        prim_type (PrimType): Which type of prim the object is, Valid options are: {PrimType.RIGID, PrimType.CLOTH}
        load_config (None or dict): If specified, should contain keyword-mapped values that are relevant for
            loading this prim at runtime.
        abilities (None or dict): If specified, manually adds specific object states to this object. It should be
            a dict in the form of {ability: {param: value}} containing object abilities and parameters to pass to
            the object state instance constructor.
        include_default_states (bool): whether to include the default object states from @get_default_states
        kwargs (dict): Additional keyword arguments that are used for other super() calls from subclasses, allowing
            for flexible compositions of various object subclasses (e.g.: Robot is USDObject + ControllableObject).
    """
    # Values that will be filled later
    self._states = None
    self._emitters = dict()
    self._visual_states = None
    self._current_texture_state = None
    self._include_default_states = include_default_states

    # Load abilities from taxonomy if needed & possible
    if abilities is None:
        abilities = {}
        taxonomy_class = OBJECT_TAXONOMY.get_synset_from_category(category)
        if taxonomy_class is not None:
            abilities = OBJECT_TAXONOMY.get_abilities(taxonomy_class)
    assert isinstance(abilities, dict), "Object abilities must be in dictionary form."
    self._abilities = abilities

    # Run super init
    super().__init__(
        relative_prim_path=relative_prim_path,
        name=name,
        category=category,
        scale=scale,
        visible=visible,
        fixed_base=fixed_base,
        visual_only=visual_only,
        kinematic_only=kinematic_only,
        self_collisions=self_collisions,
        prim_type=prim_type,
        load_config=load_config,
        **kwargs,
    )

add_state(state)

Adds state @state with name @name to self.states.

Parameters:

Name Type Description Default
state ObjectStateBase

Object state instance to add to this object

required
Source code in omnigibson/objects/stateful_object.py
def add_state(self, state):
    """
    Adds state @state with name @name to self.states.

    Args:
        state (ObjectStateBase): Object state instance to add to this object
    """
    assert self._states is not None, "Cannot add state since states have not been initialized yet!"
    assert state.__class__ not in self._states, (
        f"State {state.__class__.__name__} " f"has already been added to this object!"
    )
    self._states[state.__class__] = state

clear_states_cache()

Clears the internal cache from all owned states

Source code in omnigibson/objects/stateful_object.py
def clear_states_cache(self):
    """
    Clears the internal cache from all owned states
    """
    # Check self._states just in case states have not been initialized yet.
    if not self._states:
        return
    for _, obj_state in self._states.items():
        obj_state.clear_cache()

get_textures()

Gets prim's texture files.

Returns:

Type Description
list of str

List of texture file paths

Source code in omnigibson/objects/stateful_object.py
def get_textures(self):
    """
    Gets prim's texture files.

    Returns:
        list of str: List of texture file paths
    """
    return [material.diffuse_texture for material in self.materials if material.diffuse_texture is not None]

prepare_object_states()

Prepare the state dictionary for an object by generating the appropriate object state instances.

This uses the abilities of the object and the state dependency graph to find & instantiate all relevant states.

Source code in omnigibson/objects/stateful_object.py
def prepare_object_states(self):
    """
    Prepare the state dictionary for an object by generating the appropriate
    object state instances.

    This uses the abilities of the object and the state dependency graph to
    find & instantiate all relevant states.
    """
    states_info = (
        {state_type: {"ability": None, "params": dict()} for state_type in get_default_states()}
        if self._include_default_states
        else dict()
    )

    # Map the state type (class) to ability name and params
    if gm.ENABLE_OBJECT_STATES:
        for ability in tuple(self._abilities.keys()):
            # First, sanity check all ability requirements
            compatible = True
            for requirement in get_requirements_for_ability(ability):
                compatible, reason = requirement.is_compatible(obj=self)
                if not compatible:
                    # Print out warning and pop ability
                    log.warning(
                        f"Ability '{ability}' is incompatible with obj {self.name}, "
                        f"because requirement {requirement.__name__} was not met. Reason: {reason}"
                    )
                    self._abilities.pop(ability)
                    break
            if compatible:
                params = self._abilities[ability]
                for state_type in get_states_for_ability(ability):
                    states_info[state_type] = {
                        "ability": ability,
                        "params": state_type.postprocess_ability_params(params, self.scene),
                    }

    # Add the dependencies into the list, too, and sort based on the dependency chain
    # Must iterate over explicit tuple since dictionary changes size mid-iteration
    for state_type in tuple(states_info.keys()):
        # Add each state's dependencies, too. Note that only required dependencies are explicitly added, but both
        # required AND optional dependencies are checked / sorted
        for dependency in state_type.get_dependencies():
            if dependency not in states_info:
                states_info[dependency] = {"ability": None, "params": dict()}

    # Iterate over all sorted state types, generating the states in topological order.
    self._states = dict()
    for state_type in get_states_by_dependency_order(states=states_info):
        # Skip over any types that are not in our info dict -- these correspond to optional dependencies
        if state_type not in states_info:
            continue

        relevant_params = extract_class_init_kwargs_from_dict(
            cls=state_type, dic=states_info[state_type]["params"], copy=False
        )
        compatible, reason = state_type.is_compatible(obj=self, **relevant_params)
        if compatible:
            self._states[state_type] = state_type(obj=self, **relevant_params)
        else:
            log.warning(f"State {state_type.__name__} is incompatible with obj {self.name}. Reason: {reason}")
            # Remove the ability if it exists
            # Note that the object may still have some of the states related to the desired ability. In this way,
            # we guarantee that the existence of a certain ability in self.abilities means at ALL corresponding
            # object state dependencies are met by the underlying object asset
            ability = states_info[state_type]["ability"]
            if ability in self._abilities:
                self._abilities.pop(ability)

set_emitter_enabled(emitter_type, value)

Enable/disable the emitter prim for fire/steam effect.

Parameters:

Name Type Description Default
emitter_type EmitterType

Emitter to set

required
value bool

Value to set

required
Source code in omnigibson/objects/stateful_object.py
def set_emitter_enabled(self, emitter_type, value):
    """
    Enable/disable the emitter prim for fire/steam effect.

    Args:
        emitter_type (EmitterType): Emitter to set
        value (bool): Value to set
    """
    if emitter_type not in self._emitters:
        return
    if value != self._emitters[emitter_type].GetAttribute("enabled").Get():
        self._emitters[emitter_type].GetAttribute("enabled").Set(value)

set_position_orientation(position=None, orientation=None, frame='world')

Set the position and orientation of stateful object.

Parameters:

Name Type Description Default
position None or 3 - array

The position to set the object to. If None, the position is not changed.

None
orientation None or 4 - array

The orientation to set the object to. If None, the orientation is not changed.

None
frame Literal

The frame in which to set the position and orientation. Defaults to world. parent frame

'world'
Source code in omnigibson/objects/stateful_object.py
def set_position_orientation(
    self, position=None, orientation=None, frame: Literal["world", "parent", "scene"] = "world"
):
    """
    Set the position and orientation of stateful object.

    Args:
        position (None or 3-array): The position to set the object to. If None, the position is not changed.
        orientation (None or 4-array): The orientation to set the object to. If None, the orientation is not changed.
        frame (Literal): The frame in which to set the position and orientation. Defaults to world. parent frame
        set position relative to the object parent. scene frame set position relative to the scene.
    """
    super().set_position_orientation(position=position, orientation=orientation, frame=frame)
    self.clear_states_cache()

state_updated()

Adds this object to this object's scene's updated_state_objects set -- generally called externally by owned object state instances when its state is updated. This is useful for tracking when this object has had its state updated within the last simulation step

Source code in omnigibson/objects/stateful_object.py
def state_updated(self):
    """
    Adds this object to this object's scene's updated_state_objects set -- generally called externally
    by owned object state instances when its state is updated. This is useful for tracking when this object
    has had its state updated within the last simulation step
    """
    self.scene.updated_state_objects.add(self)

update_visuals()

Update the prim's visuals (texture change, steam/fire effects, etc). Should be called after all the states are updated.

Source code in omnigibson/objects/stateful_object.py
def update_visuals(self):
    """
    Update the prim's visuals (texture change, steam/fire effects, etc).
    Should be called after all the states are updated.
    """
    if len(self._visual_states) > 0:
        texture_change_states = []
        emitter_enabled = defaultdict(bool)
        for state_type in self._visual_states:
            state = self.states[state_type]
            if state_type in get_texture_change_states():
                if state_type == Saturated:
                    for particle_system in self.scene.active_systems.values():
                        if state.get_value(particle_system):
                            texture_change_states.append(state)
                            # Only need to do this once, since soaked handles all fluid systems
                            break
                elif state.get_value():
                    texture_change_states.append(state)
            if state_type in get_steam_states():
                emitter_enabled[EmitterType.STEAM] |= state.get_value()
            if state_type in get_fire_states():
                emitter_enabled[EmitterType.FIRE] |= state.get_value()

        for emitter_type in emitter_enabled:
            self.set_emitter_enabled(emitter_type, emitter_enabled[emitter_type])

        texture_change_states.sort(key=lambda s: get_texture_change_priority()[s.__class__])
        object_state = texture_change_states[-1] if len(texture_change_states) > 0 else None

        # Only update our texture change if it's a different object state than the one we already have
        if object_state != self._current_texture_state:
            self._update_texture_change(object_state)
            self._current_texture_state = object_state