def receive_1_cb(self, msg):
#print 'message received'
#print msg
im = self.br.imgmsg_to_cv2(msg, desired_encoding='passthrough')
for i in range(0, im.shape[0]):
for j in range(0, im.shape[1]):
#if not (im[i, j, 0] == 255 and im[i, j, 1] == 0 and im[i, j, 2] == 0):
if im[i, j, 0] == 255 and im[i, j, 1] == 0 and im[i, j, 2] == 0:
#print 'Detect an intruder!'
msg_sended = Bool()
msg_sended.data = True
send = FunctionUnit.create_send(self, self._send_topic, Bool)
send.send(msg_sended)
virtual_msg = Bool()
virtual_msg.data = False
self._virtual_send.send(virtual_msg)
python类Bool()的实例源码
def connect_topics(
src_topic, dest_topic, src_topic_type, dest_topic_type, multiplier=1,
deadband=0
):
rospy.loginfo("Connecting topic {} to topic {}".format(
src_topic, dest_topic
))
pub = rospy.Publisher(dest_topic, dest_topic_type, queue_size=10)
def callback(src_item):
val = src_item.data
val *= multiplier
if dest_topic_type == Bool:
val = (val > deadband)
dest_item = dest_topic_type(val)
pub.publish(dest_item)
sub = rospy.Subscriber(src_topic, src_topic_type, callback)
return sub, pub
def main():
rospy.init_node("evaluation_ac")
ac = ACControllerSimulator()
rospy.Subscriber("/aide/ac_control", Bool, callback=lambda msg: ac.increase_ac() if msg.data else ac.decrease_ac())
console = Console()
console.preprocess = lambda source: source[3:]
atexit.register(loginfo, "Going down by user-input.")
ac_thread = Thread(target=ac.simulate)
ac_thread.daemon = True
pub_thread = Thread(target=publish, args=(ac, ))
pub_thread.daemon = True
pub_thread.start()
while not rospy.is_shutdown():
try:
command = console.raw_input("ac> ")
if command == "start":
ac_thread.start()
if command == "end":
return
except EOFError:
print("")
ac.finished = True
rospy.signal_shutdown("Going down.")
action_database.py 文件源码
项目:cu-perception-manipulation-stack
作者: correlllab
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def execute(self,userdata):
time.sleep(1)
self.perception_pub.publish(Bool(True))
time.sleep(self.perception_time)
self.perception_pub.publish(Bool(False))
frames = self.listen.getFrameStrings()
object_frames = [of for of in frames if of in self.object_list]
if object_frames[0] and object_frames[1]:
translation1, quaternion1 = self.listen.lookupTransform("/root", object_frames[0], rospy.Time(0))
translation2, quaternion2 = self.listen.lookupTransform("/root", object_frames[1], rospy.Time(0))
if translation1[2] > translation2[2]:
userdata.place_obj_name_out = object_frames[0]
userdata.pick_obj_name_out = object_frames[1]
else:
userdata.place_obj_name_out = object_frames[1]
userdata.pick_obj_name_out = object_frames[0]
return 'objects_found'
else:
return 'objects_not_found'
def __init__(self):
self.services = {'record': {'name': 'perception/record', 'type': Record}}
for service_name, service in self.services.items():
rospy.loginfo("Controller is waiting service {}...".format(service['name']))
rospy.wait_for_service(service['name'])
service['call'] = rospy.ServiceProxy(service['name'], service['type'])
# Buttons switches + LEDs
self.prefix = 'sensors'
self.button_leds_topics = ['button_leds/help', 'button_leds/pause']
self.buttons_topics = ['buttons/help', 'buttons/pause']
self.subscribers = [rospy.Subscriber('/'.join([self.prefix, topic]), Bool, lambda msg: self._cb_button_pressed(msg, topic)) for topic in self.buttons_topics]
self.publishers = {topic: rospy.Publisher('/'.join([self.prefix, topic]), Bool, queue_size=1) for topic in self.button_leds_topics}
self.button_leds_status = {topic: False for topic in self.button_leds_topics}
self.button_pressed = {topic: False for topic in self.buttons_topics}
self.last_press = {topic: rospy.Time(0) for topic in self.buttons_topics}
self.rospack = RosPack()
with open(join(self.rospack.get_path('apex_playground'), 'config', 'perception.json')) as f:
self.params = json.load(f)
def __init__(self):
self.services = {'set_iteration': {'name': 'learning/set_iteration', 'type': SetIteration},
'set_focus': {'name': 'learning/set_interest', 'type': SetFocus},
'assess': {'name': 'controller/assess', 'type': Assess},
'get_interests': {'name': 'learning/get_interests', 'type': GetInterests}}
rospy.Subscriber('learning/current_focus', String, self._cb_focus)
rospy.Subscriber('learning/user_focus', String, self._cb_user_focus)
rospy.Subscriber('learning/ready_for_interaction', Bool, self._cb_ready)
self.current_focus = ""
self.user_focus = ""
self.ready_for_interaction = False
for service_name, service in self.services.items():
rospy.loginfo("User node is waiting service {}...".format(service['name']))
rospy.wait_for_service(service['name'])
service['call'] = rospy.ServiceProxy(service['name'], service['type'])
rospy.loginfo("User node started!")
def __init__(self):
self.data_dir = rospy.get_param('/store_data/data_dir')
self.category = rospy.get_param('/store_data/category')
self.pic_count = rospy.get_param('/store_data/init_idx')
self.rate = 1/float(rospy.get_param('/store_data/rate'))
self.bridge = CvBridge()
self.image_sub = rospy.Subscriber('/camera/rgb/image_color', Image, self.img_cb)
self.patches_sub = rospy.Subscriber('/candidate_regions_depth', PolygonStamped, self.patches_cb)
self.capture_sub = rospy.Subscriber('/capture/keyboard', Bool, self.capture_cb)
# you can read this value off of your sensor from the '/camera/depth_registered/camera_info' topic
self.P = np.array([[525.0, 0.0, 319.5, 0.0], [0.0, 525.0, 239.5, 0.0], [0.0, 0.0, 1.0, 0.0]])
self.capture = False
motivational_behavior.py 文件源码
项目:micros_mars_task_alloc
作者: liminglong
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def run(self):
FunctionUnit.init_node(self)
thread.start_new_thread(self.timer,())
heartbeat_receive = FunctionUnit.create_receive(self, self._heartbeat_topic, Heartbeat, self.heartbeat_on_received)
self._heartbeat_send = FunctionUnit.create_send(self, 'heart_beat', Heartbeat)
self._switch_send = FunctionUnit.create_send(self, self._switch_topic, Bool)
#sensory_receive = FunctionUnit.create_receive(self, self._node_name+"/sensory_feedback", Bool, self.sensory_on_received)
FunctionUnit.spin(self)
# def start_motive(self):
#FunctionUnit.init_node(self)
#heartbeat_receive = FunctionUnit.create_receive(self, self._heartbeat_topic, Heartbeat, self.heartbeat_on_received)
#FunctionUnit.spin(self)
motivational_behavior.py 文件源码
项目:micros_mars_task_alloc
作者: liminglong
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def set_sensor(self,sensor_topic):
sensory_receive = FunctionUnit.create_receive(self, sensor_topic, Bool, self.sensory_on_received)
def __init__(self, node_name, receive_topic, send_topic, virtual_off= None):
FunctionUnit.__init__(self, node_name)
self._receive_topic = receive_topic
self._send_topic = send_topic
self._virtual = virtual_off
self.br = CvBridge()
self._virtual_send = FunctionUnit.create_send(self, virtual_off, Bool)
def start_switch(self):
FunctionUnit.init_node(self)
#print 'run'
if not(self._topic_1 == None):
receive_1 = FunctionUnit.create_receive(self, self._topic_1, self._msg_type_1, self.receive_cb_1)
if not(self._topic_2 == None):
receive_2 = FunctionUnit.create_receive(self, self._topic_2, self._msg_type_2, self.receive_cb_2)
receive_3 = FunctionUnit.create_receive(self, self._motive_topic_1, Bool, self.motivational_shared_var_update)
FunctionUnit.spin(self)
def test_import_absolute_msg(self):
print_importers()
# Verify that files exists and are importable
import std_msgs.msg as std_msgs
self.assert_std_message_classes(std_msgs.Bool, std_msgs.Header)
def test_import_class_from_absolute_msg(self):
"""Verify that"""
print_importers()
# Verify that files exists and are importable
from std_msgs.msg import Bool, Header
self.assert_std_message_classes(Bool, Header)
def test_double_import_uses_cache(self):
print_importers()
import std_msgs.msg as std_msgs
self.assert_std_message_classes(std_msgs.Bool, std_msgs.Header)
import std_msgs.msg as std_msgs2
self.assertTrue(std_msgs == std_msgs2)
def test_import_absolute_msg(self):
print_importers()
# Verify that files exists and are importable
import std_msgs.msg as std_msgs
print_importers_of(std_msgs)
self.assert_std_message_classes(std_msgs.Bool, std_msgs.Header)
def test_import_class_from_absolute_msg(self):
"""Verify that"""
print_importers()
# Verify that files exists and are importable
from std_msgs.msg import Bool, Header
self.assert_std_message_classes(Bool, Header)
def increase_performance(room):
"""
Increases the performance of the AC adaptor in a given room by one step.
:param room: ID of the room
:type room: str
:return:
"""
__pub.publish(Bool(True))
def decrease_performance(room):
"""
Decreases the performance of the AC adaptor in a given room by one step.
:param room: ID of the room
:type room: str
:return:
"""
__pub.publish(Bool(False))
jaco_signals.py 文件源码
项目:cu-perception-manipulation-stack
作者: correlllab
项目源码
文件源码
阅读 45
收藏 0
点赞 0
评论 0
def __init__(self):
self.objectDet = [False] * 3
self.sai_calibration = [None] * 3
self.current_fingers_touch = [False]*3
self.calibrate = False
self.calibrate_vals = [deque(maxlen=100),deque(maxlen=100),deque(maxlen=100)]
self.object_det_calibrated_pub = rospy.Publisher("/finger_sensor/obj_det_calibrated", Bool, queue_size=1)
self.calibrate_obj_det_sub = rospy.Subscriber("/finger_sensor/calibrate_obj_det", Bool, self.set_calibrate)
self.fai_sub = rospy.Subscriber("/finger_sensor/fai", FingerFAI, self.detect_touch)
self.sai_sub = rospy.Subscriber("/finger_sensor/sai", FingerSAI, self.detect_object)
self.object_det_pub = rospy.Publisher("/finger_sensor/obj_detected", FingerDetect, queue_size=1)
self.touch_pub = rospy.Publisher("/finger_sensor/touch", FingerTouch, queue_size=1)
def _calibrate(self):
self.state = "calibrate"
self.alignment_pub.publish(Bool(True))
def _perceive(self):
self.state = "perceive"
rospy.loginfo("Asking for perception...")
self.perception_pub.publish(Bool(True))
def _stop_perceive(self):
self.state = "perceive"
self.perception_pub.publish(Bool(False))
action_database.py 文件源码
项目:cu-perception-manipulation-stack
作者: correlllab
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def __init__(self,object_list,perception_time=2.5):
smach.State.__init__(self, outcomes=['objects_found', 'objects_not_found'],
output_keys=['pick_obj_name_out','place_obj_name_out'])
self.perception_pub = rospy.Publisher("/perception/enabled",
Bool,
queue_size=1)
self.listen = tf.TransformListener()
self.perception_time = perception_time
self.object_list = object_list
action_database.py 文件源码
项目:cu-perception-manipulation-stack
作者: correlllab
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def __init__(self,pos=None):
smach.State.__init__(self, outcomes=['calibrated','not_calibrated'])
self.calibrate_pub = rospy.Publisher('/finger_sensor/calibrate_obj_det',
Bool,
queue_size=1)
self.calibrated_sub = rospy.Subscriber('/finger_sensor/obj_det_calibrated',
Bool,
self.set_calibrated)
self.calibrated = None
self.finger_pos = pos
self.jgn = JacoGripper()
perception.py 文件源码
项目:cu-perception-manipulation-stack
作者: correlllab
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def __init__(self):
super(PerceptionNode, self).__init__('p_node')
self.transition_table = {
# If calibration has already happened, allow skipping it
'initial': {'q': self._perceive},
'perceive': {'q': self._post_perceive},
}
# Hardcoded place for now
self.tl = tf.TransformListener()
self.num_objects = 0
# Would this work too? Both tf and tf2 have (c) 2008...
# self.tf2 = tf2_ros.TransformListener()
self.n_objects_sub = rospy.Subscriber("/num_objects", Int64,
self.update_num_objects,
queue_size=1)
self.perception_pub = rospy.Publisher("/perception/enabled",
Bool,
queue_size=1)
self.br = tf.TransformBroadcaster()
# Dict to map imarker names and their updated poses
self.int_markers = {}
# Ideally this call would be in a Factory/Metaclass/Parent
self.show_options()
perception.py 文件源码
项目:cu-perception-manipulation-stack
作者: correlllab
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def _perceive(self):
self.state = "perceive"
rospy.loginfo("Asking for perception...")
self.perception_pub.publish(Bool(True))
def __init__(self):
self.rospack = RosPack()
with open(join(self.rospack.get_path('apex_playground'), 'config', 'ergo.json')) as f:
self.params = json.load(f)
self.button = Button(self.params)
self.rate = rospy.Rate(self.params['publish_rate'])
self.button.switch_led(False)
# Service callers
self.robot_reach_srv_name = '{}/reach'.format(self.params['robot_name'])
self.robot_compliant_srv_name = '{}/set_compliant'.format(self.params['robot_name'])
rospy.loginfo("Ergo node is waiting for poppy controllers...")
rospy.wait_for_service(self.robot_reach_srv_name)
rospy.wait_for_service(self.robot_compliant_srv_name)
self.reach_proxy = rospy.ServiceProxy(self.robot_reach_srv_name, ReachTarget)
self.compliant_proxy = rospy.ServiceProxy(self.robot_compliant_srv_name, SetCompliant)
rospy.loginfo("Controllers connected!")
self.state_pub = rospy.Publisher('ergo/state', CircularState, queue_size=1)
self.button_pub = rospy.Publisher('sensors/buttons/help', Bool, queue_size=1)
self.goals = []
self.goal = 0.
self.joy_x = 0.
self.joy_y = 0.
self.motion_started_joy = 0.
self.js = JointState()
rospy.Subscriber('sensors/joystick/{}'.format(self.params["control_joystick_id"]), Joy, self.cb_joy)
rospy.Subscriber('{}/joint_state'.format(self.params['robot_name']), JointState, self.cb_js)
rospy.Subscriber('sensors/button_leds/pause', Bool, self.cb_bt_led)
self.t = rospy.Time.now()
self.srv_reset = None
self.extended = False
self.standby = False
self.last_activity = rospy.Time.now()
self.delta_t = rospy.Time.now()
def publish_button(self):
self.button_pub.publish(Bool(data=self.button.pressed))
def publish(self):
if self.learning is not None:
with self.lock_iteration:
focus = copy(self.focus)
ready = copy(self.ready_for_interaction)
self.pub_ready.publish(Bool(data=ready))
self.pub_user_focus.publish(String(data=focus if focus is not None else ""))
self.pub_focus.publish(String(data=self.learning.get_last_focus()))
self.pub_iteration.publish(UInt32(data=self.learning.get_iterations()))
################################# Service callbacks
def record(self, human_demo, nb_points):
call = self.services['record']['call']
return call(RecordRequest(human_demo=Bool(data=human_demo), nb_points=UInt8(data=nb_points)))