Skip to content

controllable_object

ControllableObject

Bases: BaseObject

Simple class that extends object functionality for controlling joints -- this assumes that at least some joints are motorized (i.e.: non-zero low-level simulator joint motor gains) and intended to be controlled, e.g.: a conveyor belt or a robot agent

Source code in omnigibson/objects/controllable_object.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
class ControllableObject(BaseObject):
    """
    Simple class that extends object functionality for controlling joints -- this assumes that at least some joints
    are motorized (i.e.: non-zero low-level simulator joint motor gains) and intended to be controlled,
    e.g.: a conveyor belt or a robot agent
    """

    def __init__(
        self,
        name,
        relative_prim_path=None,
        category="object",
        scale=None,
        visible=True,
        fixed_base=False,
        visual_only=False,
        self_collisions=False,
        prim_type=PrimType.RIGID,
        load_config=None,
        control_freq=None,
        controller_config=None,
        action_type="continuous",
        action_normalize=True,
        reset_joint_pos=None,
        **kwargs,
    ):
        """
        Args:
            name (str): Name for the object. Names need to be unique per scene
            relative_prim_path (None or str): The path relative to its scene prim for this object. If not specified, it defaults to /<name>.
            category (str): Category for the object. Defaults to "object".
            scale (None or float or 3-array): if specified, sets either the uniform (float) or x,y,z (3-array) scale
                for this object. A single number corresponds to uniform scaling along the x,y,z axes, whereas a
                3-array specifies per-axis scaling.
            visible (bool): whether to render this object or not in the stage
            fixed_base (bool): whether to fix the base of this object or not
            visual_only (bool): Whether this object should be visual only (and not collide with any other objects)
            self_collisions (bool): Whether to enable self collisions for this object
            prim_type (PrimType): Which type of prim the object is, Valid options are: {PrimType.RIGID, PrimType.CLOTH}
            load_config (None or dict): If specified, should contain keyword-mapped values that are relevant for
                loading this prim at runtime.
            control_freq (float): control frequency (in Hz) at which to control the object. If set to be None,
                we will automatically set the control frequency to be at the render frequency by default.
            controller_config (None or dict): nested dictionary mapping controller name(s) to specific controller
                configurations for this object. This will override any default values specified by this class.
            action_type (str): one of {discrete, continuous} - what type of action space to use
            action_normalize (bool): whether to normalize inputted actions. This will override any default values
                specified by this class.
            reset_joint_pos (None or n-array): if specified, should be the joint positions that the object should
                be set to during a reset. If None (default), self._default_joint_pos will be used instead.
                Note that _default_joint_pos are hardcoded & precomputed, and thus should not be modified by the user.
                Set this value instead if you want to initialize the object with a different rese joint position.
            kwargs (dict): Additional keyword arguments that are used for other super() calls from subclasses, allowing
                for flexible compositions of various object subclasses (e.g.: Robot is USDObject + ControllableObject).
        """
        # Store inputs
        self._control_freq = control_freq
        self._controller_config = controller_config
        self._reset_joint_pos = None if reset_joint_pos is None else th.tensor(reset_joint_pos, dtype=th.float)

        # Make sure action type is valid, and also save
        assert_valid_key(key=action_type, valid_keys={"discrete", "continuous"}, name="action type")
        self._action_type = action_type
        self._action_normalize = action_normalize

        # Store internal placeholders that will be filled in later
        self._dof_to_joints = None  # dict that will map DOF indices to JointPrims
        self._last_action = None
        self._controllers = None
        self.dof_names_ordered = None
        self._control_enabled = True

        class_name = self.__class__.__name__.lower()
        if relative_prim_path:
            # If prim path is specified, assert that the last element starts with the right prefix to ensure that
            # the object will be included in the ControllableObjectViewAPI.
            assert relative_prim_path.split("/")[-1].startswith(f"controllable__{class_name}__"), (
                "If relative_prim_path is specified, the last element of the path must look like "
                f"'controllable__{class_name}__robotname' where robotname can be an arbitrary "
                "string containing no double underscores."
            )
            assert relative_prim_path.split("/")[-1].count("__") == 2, (
                "If relative_prim_path is specified, the last element of the path must look like "
                f"'controllable__{class_name}__robotname' where robotname can be an arbitrary "
                "string containing no double underscores."
            )
        else:
            # If prim path is not specified, set it to the default path, but prepend controllable.
            relative_prim_path = f"/controllable__{class_name}__{name}"

        # Run super init
        super().__init__(
            relative_prim_path=relative_prim_path,
            name=name,
            category=category,
            scale=scale,
            visible=visible,
            fixed_base=fixed_base,
            visual_only=visual_only,
            self_collisions=self_collisions,
            prim_type=prim_type,
            load_config=load_config,
            **kwargs,
        )

    def _initialize(self):
        # Assert that the prim path matches ControllableObjectViewAPI's expected format
        scene_id, robot_name = self.articulation_root_path.split("/")[2:4]
        assert scene_id.startswith(
            "scene_"
        ), "Second component of articulation root path (scene ID) must start with 'scene_'"
        robot_name_components = robot_name.split("__")
        assert (
            len(robot_name_components) == 3
        ), "Third component of articulation root path (robot name) must have 3 components separated by '__'"
        assert (
            robot_name_components[0] == "controllable"
        ), "Third component of articulation root path (robot name) must start with 'controllable'"
        assert (
            robot_name_components[1] == self.__class__.__name__.lower()
        ), "Third component of articulation root path (robot name) must contain the class name as the second part"

        # Run super first
        super()._initialize()
        # Fill in the DOF to joint mapping
        self._dof_to_joints = dict()
        idx = 0
        for joint in self._joints.values():
            for _ in range(joint.n_dof):
                self._dof_to_joints[idx] = joint
                idx += 1

        # Update the reset joint pos
        if self._reset_joint_pos is None:
            self._reset_joint_pos = self._default_joint_pos

        # Load controllers
        self._load_controllers()

        # Setup action space
        self._action_space = (
            self._create_discrete_action_space()
            if self._action_type == "discrete"
            else self._create_continuous_action_space()
        )

        # Reset the object and keep all joints still after loading
        self.reset()
        self.keep_still()

    def load(self, scene):
        # Run super first
        prim = super().load(scene)

        # Set the control frequency if one was not provided.
        expected_control_freq = 1.0 / og.sim.get_sim_step_dt()
        if self._control_freq is None:
            log.info(
                "Control frequency is None - being set to default of render_frequency: %.4f", expected_control_freq
            )
            self._control_freq = expected_control_freq
        else:
            assert math.isclose(
                expected_control_freq, self._control_freq
            ), "Stored control frequency does not match environment's render timestep."

        return prim

    def _post_load(self):
        # Call super first
        super()._post_load()

        # For controllable objects, we disable gravity of all links that are not fixed to the base link.
        # This is because we cannot accurately apply gravity compensation in the absence of a working
        # generalized gravity force computation. This may have some side effects on the measured
        # torque on each of these links, but it provides a greatly improved joint control behavior.
        # Note that we do NOT disable gravity for links that are fixed to the base link, as these links
        # are typically where most of the downward force on the robot is applied. Disabling gravity
        # for these links would result in the robot floating in the air easily. Also note that here
        # we use the base link footprint which takes into account the presence of virtual joints.

        # Find all the links that are accessible from the base link footprint via a chain of fixed
        # joints. We will disable gravity for all links that are not in this set.
        articulation_tree = self.articulation_tree
        base_footprint = self.base_footprint_link_name
        is_edge_fixed = lambda f, t: articulation_tree[f][t]["joint_type"] == JointType.JOINT_FIXED
        only_fixed_joints = nx.subgraph_view(articulation_tree, filter_edge=is_edge_fixed)
        fixed_link_names = nx.descendants(only_fixed_joints, base_footprint) | {base_footprint}

        # Find the links that are NOT fixed.
        other_link_names = set(self.links.keys()) - fixed_link_names

        # Disable gravity for those links.
        for link_name in other_link_names:
            self.links[link_name].disable_gravity()

    def _load_controllers(self):
        """
        Loads controller(s) to map inputted actions into executable (pos, vel, and / or effort) signals on this object.
        Stores created controllers as dictionary mapping controller names to specific controller
        instances used by this object.
        """
        # Generate the controller config
        self._controller_config = self._generate_controller_config(custom_config=self._controller_config)

        # Store dof idx mapping to dof name
        self.dof_names_ordered = list(self._joints.keys())

        # Initialize controllers to create
        self._controllers = dict()
        # Loop over all controllers, in the order corresponding to @action dim
        for name in self.controller_order:
            assert_valid_key(key=name, valid_keys=self._controller_config, name="controller name")
            cfg = self._controller_config[name]
            # If we're using normalized action space, override the inputs for all controllers
            if self._action_normalize:
                cfg["command_input_limits"] = "default"  # default is normalized (-1, 1)

            # Create the controller
            controller = create_controller(**cfg)
            # Verify the controller's DOFs can all be driven
            for idx in controller.dof_idx:
                assert self._joints[
                    self.dof_names_ordered[idx]
                ].driven, "Controllers should only control driveable joints!"
            self._controllers[name] = controller
        self.update_controller_mode()

    def update_controller_mode(self):
        """
        Helper function to force the joints to use the internal specified control mode and gains
        """
        # Update the control modes of each joint based on the outputted control from the controllers
        for name in self._controllers:
            for dof in self._controllers[name].dof_idx:
                control_type = self._controllers[name].control_type
                self._joints[self.dof_names_ordered[dof]].set_control_type(
                    control_type=control_type,
                    kp=self.default_kp if control_type == ControlType.POSITION else None,
                    kd=(
                        self.default_kd
                        if control_type == ControlType.POSITION or control_type == ControlType.VELOCITY
                        else None
                    ),
                )

    def _generate_controller_config(self, custom_config=None):
        """
        Generates a fully-populated controller config, overriding any default values with the corresponding values
        specified in @custom_config

        Args:
            custom_config (None or Dict[str, ...]): nested dictionary mapping controller name(s) to specific custom
                controller configurations for this object. This will override any default values specified by this class

        Returns:
            dict: Fully-populated nested dictionary mapping controller name(s) to specific controller configurations for
                this object
        """
        controller_config = {} if custom_config is None else deepcopy(custom_config)

        # Update the configs
        for group in self.controller_order:
            group_controller_name = (
                controller_config[group]["name"]
                if group in controller_config and "name" in controller_config[group]
                else self._default_controllers[group]
            )
            controller_config[group] = merge_nested_dicts(
                base_dict=self._default_controller_config[group][group_controller_name],
                extra_dict=controller_config.get(group, {}),
            )

        return controller_config

    def reload_controllers(self, controller_config=None):
        """
        Reloads controllers based on the specified new @controller_config

        Args:
            controller_config (None or Dict[str, ...]): nested dictionary mapping controller name(s) to specific
                controller configurations for this object. This will override any default values specified by this class.
        """
        self._controller_config = {} if controller_config is None else controller_config

        # (Re-)load controllers
        self._load_controllers()

        # (Re-)create the action space
        self._action_space = (
            self._create_discrete_action_space()
            if self._action_type == "discrete"
            else self._create_continuous_action_space()
        )

    def reset(self):
        # Call super first
        super().reset()

        # Override the reset joint state based on reset values
        self.set_joint_positions(positions=self._reset_joint_pos, drive=False)

    @abstractmethod
    def _create_discrete_action_space(self):
        """
        Create a discrete action space for this object. Should be implemented by the subclass (if a subclass does not
        support this type of action space, it should raise an error).

        Returns:
            gym.space: Object-specific discrete action space
        """
        raise NotImplementedError

    def _create_continuous_action_space(self):
        """
        Create a continuous action space for this object. By default, this loops over all controllers and
        appends their respective input command limits to set the action space.
        Any custom behavior should be implemented by the subclass (e.g.: if a subclass does not
        support this type of action space, it should raise an error).

        Returns:
            gym.space.Box: Object-specific continuous action space
        """
        # Action space is ordered according to the order in _default_controller_config control
        low, high = [], []
        for controller in self._controllers.values():
            limits = controller.command_input_limits
            low.append(th.tensor([-float("inf")] * controller.command_dim) if limits is None else limits[0])
            high.append(th.tensor([float("inf")] * controller.command_dim) if limits is None else limits[1])

        return gym.spaces.Box(
            shape=(self.action_dim,),
            low=th.cat(low).cpu().numpy(),
            high=th.cat(high).cpu().numpy(),
            dtype=NumpyTypes.FLOAT32,
        )

    def apply_action(self, action):
        """
        Converts inputted actions into low-level control signals

        NOTE: This does NOT deploy control on the object. Use self.step() instead.

        Args:
            action (n-array): n-DOF length array of actions to apply to this object's internal controllers
        """
        # Store last action as the current action being applied
        self._last_action = action

        # If we're using discrete action space, we grab the specific action and use that to convert to control
        if self._action_type == "discrete":
            action = th.tensor(self.discrete_action_list[action], dtype=th.float32)

        # Sanity check that action is 1D array
        assert len(action.shape) == 1, f"Action must be 1D array, got {len(action.shape)}D array!"

        # Sanity check that action is 1D array
        assert len(action.shape) == 1, f"Action must be 1D array, got {len(action.shape)}D array!"

        # Check if the input action's length matches the action dimension
        assert len(action) == self.action_dim, "Action must be dimension {}, got dim {} instead.".format(
            self.action_dim, len(action)
        )

        # First, loop over all controllers, and update the desired command
        idx = 0

        for name, controller in self._controllers.items():
            # Set command, then take a controller step
            controller.update_goal(
                command=action[idx : idx + controller.command_dim], control_dict=self.get_control_dict()
            )
            # Update idx
            idx += controller.command_dim

    @property
    def control_enabled(self):
        return self._control_enabled

    @control_enabled.setter
    def control_enabled(self, value):
        self._control_enabled = value

    def step(self):
        """
        Takes a controller step across all controllers and deploys the computed control signals onto the object.
        """
        # Skip if we don't have control enabled
        if not self.control_enabled:
            return

        # Skip this step if our articulation view is not valid
        if self._articulation_view_direct is None or not self._articulation_view_direct.initialized:
            return

        # First, loop over all controllers, and calculate the computed control
        control = dict()
        idx = 0

        # Compose control_dict
        control_dict = self.get_control_dict()

        for name, controller in self._controllers.items():
            control[name] = {
                "value": controller.step(control_dict=control_dict),
                "type": controller.control_type,
            }
            # Update idx
            idx += controller.command_dim

        # Compose controls
        u_vec = th.zeros(self.n_dof)
        # By default, the control type is None and the control value is 0 (th.zeros) - i.e. no control applied
        u_type_vec = th.tensor([ControlType.NONE] * self.n_dof)
        for group, ctrl in control.items():
            idx = self._controllers[group].dof_idx
            u_vec[idx] = ctrl["value"]
            u_type_vec[idx] = ctrl["type"]

        u_vec, u_type_vec = self._postprocess_control(control=u_vec, control_type=u_type_vec)

        # Deploy control signals
        self.deploy_control(control=u_vec, control_type=u_type_vec)

    def _postprocess_control(self, control, control_type):
        """
        Runs any postprocessing on @control with corresponding @control_type on this entity. Default is no-op.
        Deploys control signals @control with corresponding @control_type on this entity.

        Args:
            control (k- or n-array): control signals to deploy. This should be n-DOF length if all joints are being set,
                or k-length (k < n) if specific indices are being set. In this case, the length of @control must
                be the same length as @indices!
            control_type (k- or n-array): control types for each DOF. Each entry should be one of ControlType.
                 This should be n-DOF length if all joints are being set, or k-length (k < n) if specific
                 indices are being set. In this case, the length of @control must be the same length as @indices!

        Returns:
            2-tuple:
                - n-array: raw control signals to send to the object's joints
                - list: control types for each joint
        """
        return control, control_type

    def deploy_control(self, control, control_type):
        """
        Deploys control signals @control with corresponding @control_type on this entity.

        Note: This is DIFFERENT than self.set_joint_positions/velocities/efforts, because in this case we are only
            setting target values (i.e.: we subject this entity to physical dynamics in order to reach the desired
            @control setpoints), compared to set_joint_XXXX which manually sets the actual state of the joints.

            This function is intended to be used with motorized entities, e.g.: robot agents or machines (e.g.: a
            conveyor belt) to simulation physical control of these entities.

            In contrast, use set_joint_XXXX for simulation-specific logic, such as simulator resetting or "magic"
            action implementations.

        Args:
            control (k- or n-array): control signals to deploy. This should be n-DOF length if all joints are being set,
                or k-length (k < n) if specific indices are being set. In this case, the length of @control must
                be the same length as @indices!
            control_type (k- or n-array): control types for each DOF. Each entry should be one of ControlType.
                 This should be n-DOF length if all joints are being set, or k-length (k < n) if specific
                 indices are being set. In this case, the length of @control must be the same length as @indices!
            indices (None or k-array): If specified, should be k (k < n) length array of specific DOF controls to deploy.
                Default is None, which assumes that all joints are being set.
            normalized (bool): Whether the inputted joint controls should be interpreted as normalized
                values. Expects a single bool for the entire @control. Default is False.
        """
        # Run sanity check
        assert len(control) == len(control_type) == self.n_dof, (
            "Control signals, control types, and number of DOF should all be the same!"
            "Got {}, {}, and {} respectively.".format(len(control), len(control_type), self.n_dof)
        )
        # Set indices manually so that we're standardized
        indices = range(self.n_dof)

        # Standardize normalized input
        n_indices = len(indices)

        # Loop through controls and deploy
        # We have to use delicate logic to account for the edge cases where a single joint may contain > 1 DOF
        # (e.g.: spherical joint)
        pos_vec, pos_idxs, using_pos = [], [], False
        vel_vec, vel_idxs, using_vel = [], [], False
        eff_vec, eff_idxs, using_eff = [], [], False
        cur_indices_idx = 0
        while cur_indices_idx != n_indices:
            # Grab the current DOF index we're controlling and find the corresponding joint
            joint = self._dof_to_joints[indices[cur_indices_idx]]
            cur_ctrl_idx = indices[cur_indices_idx]
            joint_dof = joint.n_dof
            if joint_dof > 1:
                # Run additional sanity checks since the joint has more than one DOF to make sure our controls,
                # control types, and indices all match as expected

                # Make sure the indices are mapped correctly
                assert (
                    indices[cur_indices_idx + joint_dof] == cur_ctrl_idx + joint_dof
                ), "Got mismatched control indices for a single joint!"
                # Check to make sure all joints, control_types, and normalized as all the same over n-DOF for the joint
                for group_name, group in zip(
                    ("joints", "control_types"),
                    (self._dof_to_joints, control_type),
                ):
                    assert (
                        len({group[indices[cur_indices_idx + i]] for i in range(joint_dof)}) == 1
                    ), f"Not all {group_name} were the same when trying to deploy control for a single joint!"
                # Assuming this all passes, we grab the control subvector, type, and normalized value accordingly
                ctrl = control[cur_ctrl_idx : cur_ctrl_idx + joint_dof]
            else:
                # Grab specific control. No need to do checks since this is a single value
                ctrl = control[cur_ctrl_idx]

            # Deploy control based on type
            ctrl_type = control_type[
                cur_ctrl_idx
            ]  # In multi-DOF joint case all values were already checked to be the same
            if ctrl_type == ControlType.EFFORT:
                eff_vec.append(ctrl)
                eff_idxs.append(cur_ctrl_idx)
                using_eff = True
            elif ctrl_type == ControlType.VELOCITY:
                vel_vec.append(ctrl)
                vel_idxs.append(cur_ctrl_idx)
                using_vel = True
            elif ctrl_type == ControlType.POSITION:
                pos_vec.append(ctrl)
                pos_idxs.append(cur_ctrl_idx)
                using_pos = True
            elif ctrl_type == ControlType.NONE:
                # Set zero efforts
                eff_vec.append(0)
                eff_idxs.append(cur_ctrl_idx)
                using_eff = True
            else:
                raise ValueError("Invalid control type specified: {}".format(ctrl_type))
            # Finally, increment the current index based on how many DOFs were just controlled
            cur_indices_idx += joint_dof

        # set the targets for joints
        if using_pos:
            ControllableObjectViewAPI.set_joint_position_targets(
                self.articulation_root_path, positions=th.tensor(pos_vec, dtype=th.float), indices=th.tensor(pos_idxs)
            )
        if using_vel:
            ControllableObjectViewAPI.set_joint_velocity_targets(
                self.articulation_root_path, velocities=th.tensor(vel_vec, dtype=th.float), indices=th.tensor(vel_idxs)
            )
        if using_eff:
            ControllableObjectViewAPI.set_joint_efforts(
                self.articulation_root_path, efforts=th.tensor(eff_vec, dtype=th.float), indices=th.tensor(eff_idxs)
            )

    def get_control_dict(self):
        """
        Grabs all relevant information that should be passed to each controller during each controller step. This
        automatically caches information

        Returns:
            CachedFunctions: Keyword-mapped control values for this object, mapping names to n-arrays.
                By default, returns the following (can be queried via [] or get()):

                - joint_position: (n_dof,) joint positions
                - joint_velocity: (n_dof,) joint velocities
                - joint_effort: (n_dof,) joint efforts
                - root_pos: (3,) (x,y,z) global cartesian position of the object's root link
                - root_quat: (4,) (x,y,z,w) global cartesian orientation of ths object's root link
                - mass_matrix: (n_dof, n_dof) mass matrix
                - gravity_force: (n_dof,) per-joint generalized gravity forces
                - cc_force: (n_dof,) per-joint centripetal and centrifugal forces
        """
        # Note that everything here uses the ControllableObjectViewAPI because these are faster implementations of
        # the functions that this class also implements. The API centralizes access for all of the robots in the scene
        # removing the need for multiple reads and writes.
        # TODO(cgokmen): CachedFunctions can now be entirely removed since the ControllableObjectViewAPI already implements caching.
        fcns = CachedFunctions()
        fcns["_root_pos_quat"] = lambda: ControllableObjectViewAPI.get_position_orientation(self.articulation_root_path)
        fcns["root_pos"] = lambda: fcns["_root_pos_quat"][0]
        fcns["root_quat"] = lambda: fcns["_root_pos_quat"][1]
        fcns["root_lin_vel"] = lambda: ControllableObjectViewAPI.get_linear_velocity(self.articulation_root_path)
        fcns["root_ang_vel"] = lambda: ControllableObjectViewAPI.get_angular_velocity(self.articulation_root_path)
        fcns["root_rel_lin_vel"] = lambda: ControllableObjectViewAPI.get_relative_linear_velocity(
            self.articulation_root_path
        )
        fcns["root_rel_ang_vel"] = lambda: ControllableObjectViewAPI.get_relative_angular_velocity(
            self.articulation_root_path
        )
        fcns["joint_position"] = lambda: ControllableObjectViewAPI.get_joint_positions(self.articulation_root_path)
        fcns["joint_velocity"] = lambda: ControllableObjectViewAPI.get_joint_velocities(self.articulation_root_path)
        fcns["joint_effort"] = lambda: ControllableObjectViewAPI.get_joint_efforts(self.articulation_root_path)
        fcns["mass_matrix"] = lambda: ControllableObjectViewAPI.get_mass_matrix(self.articulation_root_path)
        fcns["gravity_force"] = lambda: ControllableObjectViewAPI.get_generalized_gravity_forces(
            self.articulation_root_path
        )
        fcns["cc_force"] = lambda: ControllableObjectViewAPI.get_coriolis_and_centrifugal_forces(
            self.articulation_root_path
        )

        return fcns

    def dump_action(self):
        """
        Dump the last action applied to this object. For use in demo collection.
        """
        return self._last_action

    def set_joint_positions(self, positions, indices=None, normalized=False, drive=False):
        # Call super first
        super().set_joint_positions(positions=positions, indices=indices, normalized=normalized, drive=drive)

        # If we're not driving the joints, reset the controllers so that the goals are updated wrt to the new state
        if not drive:
            for controller in self._controllers.values():
                controller.reset()

    def _dump_state(self):
        # Grab super state
        state = super()._dump_state()

        # Add in controller states
        controller_states = dict()
        for controller_name, controller in self._controllers.items():
            controller_states[controller_name] = controller.dump_state()

        state["controllers"] = controller_states

        return state

    def _load_state(self, state):
        # Run super first
        super()._load_state(state=state)

        # Load controller states
        controller_states = state["controllers"]
        for controller_name, controller in self._controllers.items():
            controller.load_state(state=controller_states[controller_name])

    def serialize(self, state):
        # Run super first
        state_flat = super().serialize(state=state)

        # Serialize the controller states sequentially
        controller_states_flat = th.cat(
            [c.serialize(state=state["controllers"][c_name]) for c_name, c in self._controllers.items()]
        )

        # Concatenate and return
        return th.cat([state_flat, controller_states_flat])

    def deserialize(self, state):
        # Run super first
        state_dict, idx = super().deserialize(state=state)

        # Deserialize the controller states sequentially
        controller_states = dict()
        for c_name, c in self._controllers.items():
            controller_states[c_name], deserialized_items = c.deserialize(state=state[idx:])
            idx += deserialized_items
        state_dict["controllers"] = controller_states

        return state_dict, idx

    @property
    def base_footprint_link_name(self):
        """
        Get the base footprint link name for the controllable object.

        The base footprint link is the link that should be considered the base link for the object
        even in the presence of virtual joints that may be present in the object's articulation. For
        robots without virtual joints, this is the same as the root link. For robots with virtual joints,
        this is the link that is the child of the last virtual joint in the robot's articulation.

        Returns:
            str: Name of the base footprint link for this object
        """
        return self.root_link_name

    @property
    def base_footprint_link(self):
        """
        Get the base footprint link for the controllable object.

        The base footprint link is the link that should be considered the base link for the object
        even in the presence of virtual joints that may be present in the object's articulation. For
        robots without virtual joints, this is the same as the root link. For robots with virtual joints,
        this is the link that is the child of the last virtual joint in the robot's articulation.

        Returns:
            RigidPrim: Base footprint link for this object
        """
        return self.links[self.base_footprint_link_name]

    @property
    def action_dim(self):
        """
        Returns:
            int: Dimension of action space for this object. By default,
                is the sum over all controller action dimensions
        """
        return sum([controller.command_dim for controller in self._controllers.values()])

    @property
    def action_space(self):
        """
        Action space for this object.

        Returns:
            gym.space: Action space, either discrete (Discrete) or continuous (Box)
        """
        return deepcopy(self._action_space)

    @property
    def discrete_action_list(self):
        """
        Discrete choices for actions for this object. Only needs to be implemented if the object supports discrete
        actions.

        Returns:
            dict: Mapping from single action identifier (e.g.: a string, or a number) to array of continuous
                actions to deploy via this object's controllers.
        """
        raise NotImplementedError()

    @property
    def controllers(self):
        """
        Returns:
            dict: Controllers owned by this object, mapping controller name to controller object
        """
        return self._controllers

    @property
    @abstractmethod
    def controller_order(self):
        """
        Returns:
            list: Ordering of the actions, corresponding to the controllers. e.g., ["base", "arm", "gripper"],
                to denote that the action vector should be interpreted as first the base action, then arm command, then
                gripper command
        """
        raise NotImplementedError

    @property
    def controller_action_idx(self):
        """
        Returns:
            dict: Mapping from controller names (e.g.: head, base, arm, etc.) to corresponding
                indices (list) in the action vector
        """
        dic = {}
        idx = 0
        for controller in self.controller_order:
            cmd_dim = self._controllers[controller].command_dim
            dic[controller] = th.arange(idx, idx + cmd_dim)
            idx += cmd_dim

        return dic

    @property
    def controller_joint_idx(self):
        """
        Returns:
            dict: Mapping from controller names (e.g.: head, base, arm, etc.) to corresponding
                indices (list) of the joint state vector controlled by each controller
        """
        dic = {}
        for controller in self.controller_order:
            dic[controller] = self._controllers[controller].dof_idx

        return dic

    # TODO: These are cached, but they are not updated when the joint limit is changed
    @cached_property
    def control_limits(self):
        """
        Returns:
            dict: Keyword-mapped limits for this object. Dict contains:
                position: (min, max) joint limits, where min and max are N-DOF arrays
                velocity: (min, max) joint velocity limits, where min and max are N-DOF arrays
                effort: (min, max) joint effort limits, where min and max are N-DOF arrays
                has_limit: (n_dof,) array where each element is True if that corresponding joint has a position limit
                    (otherwise, joint is assumed to be limitless)
        """
        return {
            "position": (self.joint_lower_limits, self.joint_upper_limits),
            "velocity": (-self.max_joint_velocities, self.max_joint_velocities),
            "effort": (-self.max_joint_efforts, self.max_joint_efforts),
            "has_limit": self.joint_has_limits,
        }

    @property
    def default_kp(self):
        """
        Returns:
            float: Default kp gain to apply to any DOF when switching control modes (e.g.: switching from a
                velocity control mode to a position control mode)
        """
        return 1e7

    @property
    def default_kd(self):
        """
        Returns:
            float: Default kd gain to apply to any DOF when switching control modes (e.g.: switching from a
                position control mode to a velocity control mode)
        """
        return 1e5

    @property
    def reset_joint_pos(self):
        """
        Returns:
            n-array: reset joint positions for this robot
        """
        return self._reset_joint_pos

    @reset_joint_pos.setter
    def reset_joint_pos(self, value):
        """
        Args:
            value: the new reset joint positions for this robot
        """
        self._reset_joint_pos = value

    @property
    @abstractmethod
    def _default_joint_pos(self):
        """
        Returns:
            n-array: Default joint positions for this robot
        """
        raise NotImplementedError

    @property
    @abstractmethod
    def _default_controller_config(self):
        """
        Returns:
            dict: default nested dictionary mapping controller name(s) to specific controller
                configurations for this object. Note that the order specifies the sequence of actions to be received
                from the environment.

                Expected structure is as follows:
                    group1:
                        controller_name1:
                            controller_name1_params
                            ...
                        controller_name2:
                            ...
                    group2:
                        ...

                The @group keys specify the control type for various aspects of the object,
                e.g.: "head", "arm", "base", etc. @controller_name keys specify the supported controllers for
                that group. A default specification MUST be specified for each controller_name.
                e.g.: IKController, DifferentialDriveController, JointController, etc.
        """
        return {}

    @property
    @abstractmethod
    def _default_controllers(self):
        """
        Returns:
            dict: Maps object group (e.g. base, arm, etc.) to default controller class name to use
            (e.g. IKController, JointController, etc.)
        """
        return {}

action_dim property

Returns:

Type Description
int

Dimension of action space for this object. By default, is the sum over all controller action dimensions

action_space property

Action space for this object.

Returns:

Type Description
space

Action space, either discrete (Discrete) or continuous (Box)

Get the base footprint link for the controllable object.

The base footprint link is the link that should be considered the base link for the object even in the presence of virtual joints that may be present in the object's articulation. For robots without virtual joints, this is the same as the root link. For robots with virtual joints, this is the link that is the child of the last virtual joint in the robot's articulation.

Returns:

Type Description
RigidPrim

Base footprint link for this object

Get the base footprint link name for the controllable object.

The base footprint link is the link that should be considered the base link for the object even in the presence of virtual joints that may be present in the object's articulation. For robots without virtual joints, this is the same as the root link. For robots with virtual joints, this is the link that is the child of the last virtual joint in the robot's articulation.

Returns:

Type Description
str

Name of the base footprint link for this object

control_limits cached property

Returns:

Type Description
dict

Keyword-mapped limits for this object. Dict contains: position: (min, max) joint limits, where min and max are N-DOF arrays velocity: (min, max) joint velocity limits, where min and max are N-DOF arrays effort: (min, max) joint effort limits, where min and max are N-DOF arrays has_limit: (n_dof,) array where each element is True if that corresponding joint has a position limit (otherwise, joint is assumed to be limitless)

controller_action_idx property

Returns:

Type Description
dict

Mapping from controller names (e.g.: head, base, arm, etc.) to corresponding indices (list) in the action vector

controller_joint_idx property

Returns:

Type Description
dict

Mapping from controller names (e.g.: head, base, arm, etc.) to corresponding indices (list) of the joint state vector controlled by each controller

controller_order abstractmethod property

Returns:

Type Description
list

Ordering of the actions, corresponding to the controllers. e.g., ["base", "arm", "gripper"], to denote that the action vector should be interpreted as first the base action, then arm command, then gripper command

controllers property

Returns:

Type Description
dict

Controllers owned by this object, mapping controller name to controller object

default_kd property

Returns:

Type Description
float

Default kd gain to apply to any DOF when switching control modes (e.g.: switching from a position control mode to a velocity control mode)

default_kp property

Returns:

Type Description
float

Default kp gain to apply to any DOF when switching control modes (e.g.: switching from a velocity control mode to a position control mode)

discrete_action_list property

Discrete choices for actions for this object. Only needs to be implemented if the object supports discrete actions.

Returns:

Type Description
dict

Mapping from single action identifier (e.g.: a string, or a number) to array of continuous actions to deploy via this object's controllers.

reset_joint_pos property writable

Returns:

Type Description
n - array

reset joint positions for this robot

__init__(name, relative_prim_path=None, category='object', scale=None, visible=True, fixed_base=False, visual_only=False, self_collisions=False, prim_type=PrimType.RIGID, load_config=None, control_freq=None, controller_config=None, action_type='continuous', action_normalize=True, reset_joint_pos=None, **kwargs)

Parameters:

Name Type Description Default
name str

Name for the object. Names need to be unique per scene

required
relative_prim_path None or str

The path relative to its scene prim for this object. If not specified, it defaults to /.

None
category str

Category for the object. Defaults to "object".

'object'
scale None or float or 3 - array

if specified, sets either the uniform (float) or x,y,z (3-array) scale for this object. A single number corresponds to uniform scaling along the x,y,z axes, whereas a 3-array specifies per-axis scaling.

None
visible bool

whether to render this object or not in the stage

True
fixed_base bool

whether to fix the base of this object or not

False
visual_only bool

Whether this object should be visual only (and not collide with any other objects)

False
self_collisions bool

Whether to enable self collisions for this object

False
prim_type PrimType

Which type of prim the object is, Valid options are: {PrimType.RIGID, PrimType.CLOTH}

RIGID
load_config None or dict

If specified, should contain keyword-mapped values that are relevant for loading this prim at runtime.

None
control_freq float

control frequency (in Hz) at which to control the object. If set to be None, we will automatically set the control frequency to be at the render frequency by default.

None
controller_config None or dict

nested dictionary mapping controller name(s) to specific controller configurations for this object. This will override any default values specified by this class.

None
action_type str

one of {discrete, continuous} - what type of action space to use

'continuous'
action_normalize bool

whether to normalize inputted actions. This will override any default values specified by this class.

True
reset_joint_pos None or n - array

if specified, should be the joint positions that the object should be set to during a reset. If None (default), self._default_joint_pos will be used instead. Note that _default_joint_pos are hardcoded & precomputed, and thus should not be modified by the user. Set this value instead if you want to initialize the object with a different rese joint position.

None
kwargs dict

Additional keyword arguments that are used for other super() calls from subclasses, allowing for flexible compositions of various object subclasses (e.g.: Robot is USDObject + ControllableObject).

{}
Source code in omnigibson/objects/controllable_object.py
def __init__(
    self,
    name,
    relative_prim_path=None,
    category="object",
    scale=None,
    visible=True,
    fixed_base=False,
    visual_only=False,
    self_collisions=False,
    prim_type=PrimType.RIGID,
    load_config=None,
    control_freq=None,
    controller_config=None,
    action_type="continuous",
    action_normalize=True,
    reset_joint_pos=None,
    **kwargs,
):
    """
    Args:
        name (str): Name for the object. Names need to be unique per scene
        relative_prim_path (None or str): The path relative to its scene prim for this object. If not specified, it defaults to /<name>.
        category (str): Category for the object. Defaults to "object".
        scale (None or float or 3-array): if specified, sets either the uniform (float) or x,y,z (3-array) scale
            for this object. A single number corresponds to uniform scaling along the x,y,z axes, whereas a
            3-array specifies per-axis scaling.
        visible (bool): whether to render this object or not in the stage
        fixed_base (bool): whether to fix the base of this object or not
        visual_only (bool): Whether this object should be visual only (and not collide with any other objects)
        self_collisions (bool): Whether to enable self collisions for this object
        prim_type (PrimType): Which type of prim the object is, Valid options are: {PrimType.RIGID, PrimType.CLOTH}
        load_config (None or dict): If specified, should contain keyword-mapped values that are relevant for
            loading this prim at runtime.
        control_freq (float): control frequency (in Hz) at which to control the object. If set to be None,
            we will automatically set the control frequency to be at the render frequency by default.
        controller_config (None or dict): nested dictionary mapping controller name(s) to specific controller
            configurations for this object. This will override any default values specified by this class.
        action_type (str): one of {discrete, continuous} - what type of action space to use
        action_normalize (bool): whether to normalize inputted actions. This will override any default values
            specified by this class.
        reset_joint_pos (None or n-array): if specified, should be the joint positions that the object should
            be set to during a reset. If None (default), self._default_joint_pos will be used instead.
            Note that _default_joint_pos are hardcoded & precomputed, and thus should not be modified by the user.
            Set this value instead if you want to initialize the object with a different rese joint position.
        kwargs (dict): Additional keyword arguments that are used for other super() calls from subclasses, allowing
            for flexible compositions of various object subclasses (e.g.: Robot is USDObject + ControllableObject).
    """
    # Store inputs
    self._control_freq = control_freq
    self._controller_config = controller_config
    self._reset_joint_pos = None if reset_joint_pos is None else th.tensor(reset_joint_pos, dtype=th.float)

    # Make sure action type is valid, and also save
    assert_valid_key(key=action_type, valid_keys={"discrete", "continuous"}, name="action type")
    self._action_type = action_type
    self._action_normalize = action_normalize

    # Store internal placeholders that will be filled in later
    self._dof_to_joints = None  # dict that will map DOF indices to JointPrims
    self._last_action = None
    self._controllers = None
    self.dof_names_ordered = None
    self._control_enabled = True

    class_name = self.__class__.__name__.lower()
    if relative_prim_path:
        # If prim path is specified, assert that the last element starts with the right prefix to ensure that
        # the object will be included in the ControllableObjectViewAPI.
        assert relative_prim_path.split("/")[-1].startswith(f"controllable__{class_name}__"), (
            "If relative_prim_path is specified, the last element of the path must look like "
            f"'controllable__{class_name}__robotname' where robotname can be an arbitrary "
            "string containing no double underscores."
        )
        assert relative_prim_path.split("/")[-1].count("__") == 2, (
            "If relative_prim_path is specified, the last element of the path must look like "
            f"'controllable__{class_name}__robotname' where robotname can be an arbitrary "
            "string containing no double underscores."
        )
    else:
        # If prim path is not specified, set it to the default path, but prepend controllable.
        relative_prim_path = f"/controllable__{class_name}__{name}"

    # Run super init
    super().__init__(
        relative_prim_path=relative_prim_path,
        name=name,
        category=category,
        scale=scale,
        visible=visible,
        fixed_base=fixed_base,
        visual_only=visual_only,
        self_collisions=self_collisions,
        prim_type=prim_type,
        load_config=load_config,
        **kwargs,
    )

apply_action(action)

Converts inputted actions into low-level control signals

NOTE: This does NOT deploy control on the object. Use self.step() instead.

Parameters:

Name Type Description Default
action n - array

n-DOF length array of actions to apply to this object's internal controllers

required
Source code in omnigibson/objects/controllable_object.py
def apply_action(self, action):
    """
    Converts inputted actions into low-level control signals

    NOTE: This does NOT deploy control on the object. Use self.step() instead.

    Args:
        action (n-array): n-DOF length array of actions to apply to this object's internal controllers
    """
    # Store last action as the current action being applied
    self._last_action = action

    # If we're using discrete action space, we grab the specific action and use that to convert to control
    if self._action_type == "discrete":
        action = th.tensor(self.discrete_action_list[action], dtype=th.float32)

    # Sanity check that action is 1D array
    assert len(action.shape) == 1, f"Action must be 1D array, got {len(action.shape)}D array!"

    # Sanity check that action is 1D array
    assert len(action.shape) == 1, f"Action must be 1D array, got {len(action.shape)}D array!"

    # Check if the input action's length matches the action dimension
    assert len(action) == self.action_dim, "Action must be dimension {}, got dim {} instead.".format(
        self.action_dim, len(action)
    )

    # First, loop over all controllers, and update the desired command
    idx = 0

    for name, controller in self._controllers.items():
        # Set command, then take a controller step
        controller.update_goal(
            command=action[idx : idx + controller.command_dim], control_dict=self.get_control_dict()
        )
        # Update idx
        idx += controller.command_dim

deploy_control(control, control_type)

Deploys control signals @control with corresponding @control_type on this entity.

This is DIFFERENT than self.set_joint_positions/velocities/efforts, because in this case we are only

setting target values (i.e.: we subject this entity to physical dynamics in order to reach the desired @control setpoints), compared to set_joint_XXXX which manually sets the actual state of the joints.

This function is intended to be used with motorized entities, e.g.: robot agents or machines (e.g.: a conveyor belt) to simulation physical control of these entities.

In contrast, use set_joint_XXXX for simulation-specific logic, such as simulator resetting or "magic" action implementations.

Parameters:

Name Type Description Default
control k- or n-array

control signals to deploy. This should be n-DOF length if all joints are being set, or k-length (k < n) if specific indices are being set. In this case, the length of @control must be the same length as @indices!

required
control_type k- or n-array

control types for each DOF. Each entry should be one of ControlType. This should be n-DOF length if all joints are being set, or k-length (k < n) if specific indices are being set. In this case, the length of @control must be the same length as @indices!

required
indices None or k - array

If specified, should be k (k < n) length array of specific DOF controls to deploy. Default is None, which assumes that all joints are being set.

required
normalized bool

Whether the inputted joint controls should be interpreted as normalized values. Expects a single bool for the entire @control. Default is False.

required
Source code in omnigibson/objects/controllable_object.py
def deploy_control(self, control, control_type):
    """
    Deploys control signals @control with corresponding @control_type on this entity.

    Note: This is DIFFERENT than self.set_joint_positions/velocities/efforts, because in this case we are only
        setting target values (i.e.: we subject this entity to physical dynamics in order to reach the desired
        @control setpoints), compared to set_joint_XXXX which manually sets the actual state of the joints.

        This function is intended to be used with motorized entities, e.g.: robot agents or machines (e.g.: a
        conveyor belt) to simulation physical control of these entities.

        In contrast, use set_joint_XXXX for simulation-specific logic, such as simulator resetting or "magic"
        action implementations.

    Args:
        control (k- or n-array): control signals to deploy. This should be n-DOF length if all joints are being set,
            or k-length (k < n) if specific indices are being set. In this case, the length of @control must
            be the same length as @indices!
        control_type (k- or n-array): control types for each DOF. Each entry should be one of ControlType.
             This should be n-DOF length if all joints are being set, or k-length (k < n) if specific
             indices are being set. In this case, the length of @control must be the same length as @indices!
        indices (None or k-array): If specified, should be k (k < n) length array of specific DOF controls to deploy.
            Default is None, which assumes that all joints are being set.
        normalized (bool): Whether the inputted joint controls should be interpreted as normalized
            values. Expects a single bool for the entire @control. Default is False.
    """
    # Run sanity check
    assert len(control) == len(control_type) == self.n_dof, (
        "Control signals, control types, and number of DOF should all be the same!"
        "Got {}, {}, and {} respectively.".format(len(control), len(control_type), self.n_dof)
    )
    # Set indices manually so that we're standardized
    indices = range(self.n_dof)

    # Standardize normalized input
    n_indices = len(indices)

    # Loop through controls and deploy
    # We have to use delicate logic to account for the edge cases where a single joint may contain > 1 DOF
    # (e.g.: spherical joint)
    pos_vec, pos_idxs, using_pos = [], [], False
    vel_vec, vel_idxs, using_vel = [], [], False
    eff_vec, eff_idxs, using_eff = [], [], False
    cur_indices_idx = 0
    while cur_indices_idx != n_indices:
        # Grab the current DOF index we're controlling and find the corresponding joint
        joint = self._dof_to_joints[indices[cur_indices_idx]]
        cur_ctrl_idx = indices[cur_indices_idx]
        joint_dof = joint.n_dof
        if joint_dof > 1:
            # Run additional sanity checks since the joint has more than one DOF to make sure our controls,
            # control types, and indices all match as expected

            # Make sure the indices are mapped correctly
            assert (
                indices[cur_indices_idx + joint_dof] == cur_ctrl_idx + joint_dof
            ), "Got mismatched control indices for a single joint!"
            # Check to make sure all joints, control_types, and normalized as all the same over n-DOF for the joint
            for group_name, group in zip(
                ("joints", "control_types"),
                (self._dof_to_joints, control_type),
            ):
                assert (
                    len({group[indices[cur_indices_idx + i]] for i in range(joint_dof)}) == 1
                ), f"Not all {group_name} were the same when trying to deploy control for a single joint!"
            # Assuming this all passes, we grab the control subvector, type, and normalized value accordingly
            ctrl = control[cur_ctrl_idx : cur_ctrl_idx + joint_dof]
        else:
            # Grab specific control. No need to do checks since this is a single value
            ctrl = control[cur_ctrl_idx]

        # Deploy control based on type
        ctrl_type = control_type[
            cur_ctrl_idx
        ]  # In multi-DOF joint case all values were already checked to be the same
        if ctrl_type == ControlType.EFFORT:
            eff_vec.append(ctrl)
            eff_idxs.append(cur_ctrl_idx)
            using_eff = True
        elif ctrl_type == ControlType.VELOCITY:
            vel_vec.append(ctrl)
            vel_idxs.append(cur_ctrl_idx)
            using_vel = True
        elif ctrl_type == ControlType.POSITION:
            pos_vec.append(ctrl)
            pos_idxs.append(cur_ctrl_idx)
            using_pos = True
        elif ctrl_type == ControlType.NONE:
            # Set zero efforts
            eff_vec.append(0)
            eff_idxs.append(cur_ctrl_idx)
            using_eff = True
        else:
            raise ValueError("Invalid control type specified: {}".format(ctrl_type))
        # Finally, increment the current index based on how many DOFs were just controlled
        cur_indices_idx += joint_dof

    # set the targets for joints
    if using_pos:
        ControllableObjectViewAPI.set_joint_position_targets(
            self.articulation_root_path, positions=th.tensor(pos_vec, dtype=th.float), indices=th.tensor(pos_idxs)
        )
    if using_vel:
        ControllableObjectViewAPI.set_joint_velocity_targets(
            self.articulation_root_path, velocities=th.tensor(vel_vec, dtype=th.float), indices=th.tensor(vel_idxs)
        )
    if using_eff:
        ControllableObjectViewAPI.set_joint_efforts(
            self.articulation_root_path, efforts=th.tensor(eff_vec, dtype=th.float), indices=th.tensor(eff_idxs)
        )

dump_action()

Dump the last action applied to this object. For use in demo collection.

Source code in omnigibson/objects/controllable_object.py
def dump_action(self):
    """
    Dump the last action applied to this object. For use in demo collection.
    """
    return self._last_action

get_control_dict()

Grabs all relevant information that should be passed to each controller during each controller step. This automatically caches information

Returns:

Type Description
CachedFunctions

Keyword-mapped control values for this object, mapping names to n-arrays. By default, returns the following (can be queried via [] or get()):

  • joint_position: (n_dof,) joint positions
  • joint_velocity: (n_dof,) joint velocities
  • joint_effort: (n_dof,) joint efforts
  • root_pos: (3,) (x,y,z) global cartesian position of the object's root link
  • root_quat: (4,) (x,y,z,w) global cartesian orientation of ths object's root link
  • mass_matrix: (n_dof, n_dof) mass matrix
  • gravity_force: (n_dof,) per-joint generalized gravity forces
  • cc_force: (n_dof,) per-joint centripetal and centrifugal forces
Source code in omnigibson/objects/controllable_object.py
def get_control_dict(self):
    """
    Grabs all relevant information that should be passed to each controller during each controller step. This
    automatically caches information

    Returns:
        CachedFunctions: Keyword-mapped control values for this object, mapping names to n-arrays.
            By default, returns the following (can be queried via [] or get()):

            - joint_position: (n_dof,) joint positions
            - joint_velocity: (n_dof,) joint velocities
            - joint_effort: (n_dof,) joint efforts
            - root_pos: (3,) (x,y,z) global cartesian position of the object's root link
            - root_quat: (4,) (x,y,z,w) global cartesian orientation of ths object's root link
            - mass_matrix: (n_dof, n_dof) mass matrix
            - gravity_force: (n_dof,) per-joint generalized gravity forces
            - cc_force: (n_dof,) per-joint centripetal and centrifugal forces
    """
    # Note that everything here uses the ControllableObjectViewAPI because these are faster implementations of
    # the functions that this class also implements. The API centralizes access for all of the robots in the scene
    # removing the need for multiple reads and writes.
    # TODO(cgokmen): CachedFunctions can now be entirely removed since the ControllableObjectViewAPI already implements caching.
    fcns = CachedFunctions()
    fcns["_root_pos_quat"] = lambda: ControllableObjectViewAPI.get_position_orientation(self.articulation_root_path)
    fcns["root_pos"] = lambda: fcns["_root_pos_quat"][0]
    fcns["root_quat"] = lambda: fcns["_root_pos_quat"][1]
    fcns["root_lin_vel"] = lambda: ControllableObjectViewAPI.get_linear_velocity(self.articulation_root_path)
    fcns["root_ang_vel"] = lambda: ControllableObjectViewAPI.get_angular_velocity(self.articulation_root_path)
    fcns["root_rel_lin_vel"] = lambda: ControllableObjectViewAPI.get_relative_linear_velocity(
        self.articulation_root_path
    )
    fcns["root_rel_ang_vel"] = lambda: ControllableObjectViewAPI.get_relative_angular_velocity(
        self.articulation_root_path
    )
    fcns["joint_position"] = lambda: ControllableObjectViewAPI.get_joint_positions(self.articulation_root_path)
    fcns["joint_velocity"] = lambda: ControllableObjectViewAPI.get_joint_velocities(self.articulation_root_path)
    fcns["joint_effort"] = lambda: ControllableObjectViewAPI.get_joint_efforts(self.articulation_root_path)
    fcns["mass_matrix"] = lambda: ControllableObjectViewAPI.get_mass_matrix(self.articulation_root_path)
    fcns["gravity_force"] = lambda: ControllableObjectViewAPI.get_generalized_gravity_forces(
        self.articulation_root_path
    )
    fcns["cc_force"] = lambda: ControllableObjectViewAPI.get_coriolis_and_centrifugal_forces(
        self.articulation_root_path
    )

    return fcns

reload_controllers(controller_config=None)

Reloads controllers based on the specified new @controller_config

Parameters:

Name Type Description Default
controller_config None or Dict[str, ...]

nested dictionary mapping controller name(s) to specific controller configurations for this object. This will override any default values specified by this class.

None
Source code in omnigibson/objects/controllable_object.py
def reload_controllers(self, controller_config=None):
    """
    Reloads controllers based on the specified new @controller_config

    Args:
        controller_config (None or Dict[str, ...]): nested dictionary mapping controller name(s) to specific
            controller configurations for this object. This will override any default values specified by this class.
    """
    self._controller_config = {} if controller_config is None else controller_config

    # (Re-)load controllers
    self._load_controllers()

    # (Re-)create the action space
    self._action_space = (
        self._create_discrete_action_space()
        if self._action_type == "discrete"
        else self._create_continuous_action_space()
    )

step()

Takes a controller step across all controllers and deploys the computed control signals onto the object.

Source code in omnigibson/objects/controllable_object.py
def step(self):
    """
    Takes a controller step across all controllers and deploys the computed control signals onto the object.
    """
    # Skip if we don't have control enabled
    if not self.control_enabled:
        return

    # Skip this step if our articulation view is not valid
    if self._articulation_view_direct is None or not self._articulation_view_direct.initialized:
        return

    # First, loop over all controllers, and calculate the computed control
    control = dict()
    idx = 0

    # Compose control_dict
    control_dict = self.get_control_dict()

    for name, controller in self._controllers.items():
        control[name] = {
            "value": controller.step(control_dict=control_dict),
            "type": controller.control_type,
        }
        # Update idx
        idx += controller.command_dim

    # Compose controls
    u_vec = th.zeros(self.n_dof)
    # By default, the control type is None and the control value is 0 (th.zeros) - i.e. no control applied
    u_type_vec = th.tensor([ControlType.NONE] * self.n_dof)
    for group, ctrl in control.items():
        idx = self._controllers[group].dof_idx
        u_vec[idx] = ctrl["value"]
        u_type_vec[idx] = ctrl["type"]

    u_vec, u_type_vec = self._postprocess_control(control=u_vec, control_type=u_type_vec)

    # Deploy control signals
    self.deploy_control(control=u_vec, control_type=u_type_vec)

update_controller_mode()

Helper function to force the joints to use the internal specified control mode and gains

Source code in omnigibson/objects/controllable_object.py
def update_controller_mode(self):
    """
    Helper function to force the joints to use the internal specified control mode and gains
    """
    # Update the control modes of each joint based on the outputted control from the controllers
    for name in self._controllers:
        for dof in self._controllers[name].dof_idx:
            control_type = self._controllers[name].control_type
            self._joints[self.dof_names_ordered[dof]].set_control_type(
                control_type=control_type,
                kp=self.default_kp if control_type == ControlType.POSITION else None,
                kd=(
                    self.default_kd
                    if control_type == ControlType.POSITION or control_type == ControlType.VELOCITY
                    else None
                ),
            )