def __init__(self, path_root, config_msg_type, status_msg_type):
self._path = path_root
self.config_mutex = Lock()
self.state_mutex = Lock()
self.cmd_times = []
self.ports = dict()
self.signals = dict()
self.state = None
self.config = None
self.state_changed = intera_dataflow.Signal()
self.config_changed = intera_dataflow.Signal()
self._config_sub = rospy.Subscriber(self._path + "/config",
config_msg_type,
self.handle_config)
self._state_sub = rospy.Subscriber(self._path + "/state",
status_msg_type,
self.handle_state)
self._command_pub = rospy.Publisher(self._path + "/command",
IOComponentCommand, queue_size=10)
# Wait for the state to be populated
intera_dataflow.wait_for(
lambda: not self.state is None,
timeout=5.0,
timeout_msg=("Failed to get state.")
)
rospy.logdebug("Making new IOInterface on %s" % (self._path,))
评论列表
文章目录