io_interface.py 文件源码

python
阅读 21 收藏 0 点赞 0 评论 0

项目:intera_sdk 作者: RethinkRobotics 项目源码 文件源码
def __init__(self, path_root, config_msg_type, status_msg_type):
        self._path = path_root
        self.config_mutex = Lock()
        self.state_mutex = Lock()
        self.cmd_times = []
        self.ports = dict()
        self.signals = dict()
        self.state = None
        self.config = None
        self.state_changed = intera_dataflow.Signal()
        self.config_changed = intera_dataflow.Signal()

        self._config_sub = rospy.Subscriber(self._path + "/config",
                                            config_msg_type,
                                            self.handle_config)
        self._state_sub = rospy.Subscriber(self._path + "/state",
                                           status_msg_type,
                                           self.handle_state)
        self._command_pub = rospy.Publisher(self._path + "/command",
                                            IOComponentCommand, queue_size=10)

        # Wait for the state to be populated
        intera_dataflow.wait_for(
                          lambda: not self.state is None,
                          timeout=5.0,
                          timeout_msg=("Failed to get state.")
                          )

        rospy.logdebug("Making new IOInterface on %s" % (self._path,))
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号