python类Bool()的实例源码

robot_detect.py 文件源码 项目:micros_mars_task_alloc 作者: liminglong 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def receive_1_cb(self, msg):
        #print 'message received'
        #print msg
        im = self.br.imgmsg_to_cv2(msg, desired_encoding='passthrough')
        for i in range(0, im.shape[0]):
            for j in range(0, im.shape[1]):
                #if not (im[i, j, 0] == 255 and im[i, j, 1] == 0 and im[i, j, 2] == 0):
                if im[i, j, 0] == 255 and im[i, j, 1] == 0 and im[i, j, 2] == 0:
                    #print 'Detect an intruder!'      
                    msg_sended = Bool()
                    msg_sended.data = True             
                    send = FunctionUnit.create_send(self, self._send_topic, Bool)
                    send.send(msg_sended)
                    virtual_msg = Bool()
                    virtual_msg.data = False
                    self._virtual_send.send(virtual_msg)
topic_connector.py 文件源码 项目:openag_brain 作者: OpenAgInitiative 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def connect_topics(
    src_topic, dest_topic, src_topic_type, dest_topic_type, multiplier=1,
    deadband=0
):
    rospy.loginfo("Connecting topic {} to topic {}".format(
        src_topic, dest_topic
    ))
    pub = rospy.Publisher(dest_topic, dest_topic_type, queue_size=10)
    def callback(src_item):
        val = src_item.data
        val *= multiplier
        if dest_topic_type == Bool:
            val = (val > deadband)
        dest_item = dest_topic_type(val)
        pub.publish(dest_item)
    sub = rospy.Subscriber(src_topic, src_topic_type, callback)
    return sub, pub
ac_control.py 文件源码 项目:aide 作者: Lambda-3 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def main():
    rospy.init_node("evaluation_ac")
    ac = ACControllerSimulator()
    rospy.Subscriber("/aide/ac_control", Bool, callback=lambda msg: ac.increase_ac() if msg.data else ac.decrease_ac())

    console = Console()
    console.preprocess = lambda source: source[3:]
    atexit.register(loginfo, "Going down by user-input.")
    ac_thread = Thread(target=ac.simulate)
    ac_thread.daemon = True
    pub_thread = Thread(target=publish, args=(ac, ))
    pub_thread.daemon = True
    pub_thread.start()
    while not rospy.is_shutdown():
        try:
            command = console.raw_input("ac> ")
            if command == "start":
                ac_thread.start()
            if command == "end":
                return

        except EOFError:
            print("")
            ac.finished = True
            rospy.signal_shutdown("Going down.")
action_database.py 文件源码 项目:cu-perception-manipulation-stack 作者: correlllab 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def execute(self,userdata):
        time.sleep(1)
        self.perception_pub.publish(Bool(True))
        time.sleep(self.perception_time)
        self.perception_pub.publish(Bool(False))
        frames = self.listen.getFrameStrings()
        object_frames = [of for of in frames if of in self.object_list]
        if object_frames[0] and object_frames[1]:
            translation1, quaternion1 = self.listen.lookupTransform("/root", object_frames[0], rospy.Time(0))
            translation2, quaternion2 = self.listen.lookupTransform("/root", object_frames[1], rospy.Time(0))
            if translation1[2] > translation2[2]:
                userdata.place_obj_name_out = object_frames[0]
                userdata.pick_obj_name_out = object_frames[1]
            else:
                userdata.place_obj_name_out = object_frames[1]
                userdata.pick_obj_name_out = object_frames[0]
            return 'objects_found'
        else:
            return 'objects_not_found'
perception.py 文件源码 项目:APEX 作者: ymollard 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __init__(self):
        self.services = {'record': {'name': 'perception/record', 'type': Record}}
        for service_name, service in self.services.items():
            rospy.loginfo("Controller is waiting service {}...".format(service['name']))
            rospy.wait_for_service(service['name'])
            service['call'] = rospy.ServiceProxy(service['name'], service['type'])

        # Buttons switches + LEDs
        self.prefix = 'sensors'
        self.button_leds_topics = ['button_leds/help', 'button_leds/pause']
        self.buttons_topics = ['buttons/help', 'buttons/pause']
        self.subscribers = [rospy.Subscriber('/'.join([self.prefix, topic]), Bool, lambda msg: self._cb_button_pressed(msg, topic)) for topic in self.buttons_topics]
        self.publishers = {topic: rospy.Publisher('/'.join([self.prefix, topic]), Bool, queue_size=1) for topic in self.button_leds_topics}
        self.button_leds_status = {topic: False for topic in self.button_leds_topics}
        self.button_pressed = {topic: False for topic in self.buttons_topics}
        self.last_press = {topic: rospy.Time(0) for topic in self.buttons_topics}

        self.rospack = RosPack()
        with open(join(self.rospack.get_path('apex_playground'), 'config', 'perception.json')) as f:
            self.params = json.load(f)
services.py 文件源码 项目:APEX 作者: ymollard 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(self):
        self.services = {'set_iteration': {'name': 'learning/set_iteration', 'type': SetIteration},
                         'set_focus': {'name': 'learning/set_interest', 'type': SetFocus},
                         'assess': {'name': 'controller/assess', 'type': Assess},
                         'get_interests': {'name': 'learning/get_interests', 'type': GetInterests}}

        rospy.Subscriber('learning/current_focus', String, self._cb_focus)
        rospy.Subscriber('learning/user_focus', String, self._cb_user_focus)
        rospy.Subscriber('learning/ready_for_interaction', Bool, self._cb_ready)

        self.current_focus = ""
        self.user_focus = ""
        self.ready_for_interaction = False

        for service_name, service in self.services.items():
            rospy.loginfo("User node is waiting service {}...".format(service['name']))
            rospy.wait_for_service(service['name'])
            service['call'] = rospy.ServiceProxy(service['name'], service['type'])

        rospy.loginfo("User node started!")
store_data.py 文件源码 项目:geom_rcnn 作者: asbroad 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __init__(self):
        self.data_dir = rospy.get_param('/store_data/data_dir')
        self.category = rospy.get_param('/store_data/category')
        self.pic_count = rospy.get_param('/store_data/init_idx')
        self.rate = 1/float(rospy.get_param('/store_data/rate'))

        self.bridge = CvBridge()
        self.image_sub = rospy.Subscriber('/camera/rgb/image_color', Image, self.img_cb)
        self.patches_sub = rospy.Subscriber('/candidate_regions_depth', PolygonStamped, self.patches_cb)
        self.capture_sub = rospy.Subscriber('/capture/keyboard', Bool, self.capture_cb)
        # you can read this value off of your sensor from the '/camera/depth_registered/camera_info' topic
        self.P = np.array([[525.0, 0.0, 319.5, 0.0], [0.0, 525.0, 239.5, 0.0], [0.0, 0.0, 1.0, 0.0]])

        self.capture = False
motivational_behavior.py 文件源码 项目:micros_mars_task_alloc 作者: liminglong 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def run(self):
        FunctionUnit.init_node(self)
        thread.start_new_thread(self.timer,())
        heartbeat_receive = FunctionUnit.create_receive(self, self._heartbeat_topic, Heartbeat, self.heartbeat_on_received)
        self._heartbeat_send = FunctionUnit.create_send(self, 'heart_beat', Heartbeat)
        self._switch_send = FunctionUnit.create_send(self, self._switch_topic, Bool)
        #sensory_receive = FunctionUnit.create_receive(self, self._node_name+"/sensory_feedback", Bool, self.sensory_on_received)
        FunctionUnit.spin(self)

   # def start_motive(self):
        #FunctionUnit.init_node(self)
        #heartbeat_receive = FunctionUnit.create_receive(self, self._heartbeat_topic, Heartbeat, self.heartbeat_on_received)
        #FunctionUnit.spin(self)
motivational_behavior.py 文件源码 项目:micros_mars_task_alloc 作者: liminglong 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def set_sensor(self,sensor_topic):
          sensory_receive = FunctionUnit.create_receive(self, sensor_topic, Bool, self.sensory_on_received)
robot_detect.py 文件源码 项目:micros_mars_task_alloc 作者: liminglong 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self, node_name, receive_topic, send_topic, virtual_off= None):
        FunctionUnit.__init__(self, node_name)    
        self._receive_topic = receive_topic
        self._send_topic = send_topic
        self._virtual = virtual_off
        self.br = CvBridge()
        self._virtual_send = FunctionUnit.create_send(self, virtual_off, Bool)
switch.py 文件源码 项目:micros_mars_task_alloc 作者: liminglong 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def start_switch(self):
        FunctionUnit.init_node(self)
        #print 'run'
        if not(self._topic_1 == None):
            receive_1 = FunctionUnit.create_receive(self, self._topic_1, self._msg_type_1, self.receive_cb_1)
        if not(self._topic_2 == None):
            receive_2 = FunctionUnit.create_receive(self, self._topic_2, self._msg_type_2, self.receive_cb_2)
        receive_3 = FunctionUnit.create_receive(self, self._motive_topic_1, Bool, self.motivational_shared_var_update)
        FunctionUnit.spin(self)
test_rosmsg_import.py 文件源码 项目:rosimport 作者: pyros-dev 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_import_absolute_msg(self):
        print_importers()

        # Verify that files exists and are importable
        import std_msgs.msg as std_msgs

        self.assert_std_message_classes(std_msgs.Bool, std_msgs.Header)
test_rosmsg_import.py 文件源码 项目:rosimport 作者: pyros-dev 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_import_class_from_absolute_msg(self):
        """Verify that"""
        print_importers()

        # Verify that files exists and are importable
        from std_msgs.msg import Bool, Header

        self.assert_std_message_classes(Bool, Header)
test_rosmsg_import.py 文件源码 项目:rosimport 作者: pyros-dev 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_double_import_uses_cache(self):
        print_importers()

        import std_msgs.msg as std_msgs

        self.assert_std_message_classes(std_msgs.Bool, std_msgs.Header)

        import std_msgs.msg as std_msgs2

        self.assertTrue(std_msgs == std_msgs2)
test_rosmsg_import.py 文件源码 项目:rosimport 作者: pyros-dev 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_import_absolute_msg(self):
        print_importers()

        # Verify that files exists and are importable
        import std_msgs.msg as std_msgs

        print_importers_of(std_msgs)

        self.assert_std_message_classes(std_msgs.Bool, std_msgs.Header)
test_rosmsg_import.py 文件源码 项目:rosimport 作者: pyros-dev 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_import_class_from_absolute_msg(self):
        """Verify that"""
        print_importers()

        # Verify that files exists and are importable
        from std_msgs.msg import Bool, Header

        self.assert_std_message_classes(Bool, Header)
air_conditioner.py 文件源码 项目:aide 作者: Lambda-3 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def increase_performance(room):
    """
    Increases the performance of the AC adaptor in a given room by one step.


    :param room: ID of the room
    :type room: str
    :return:
    """
    __pub.publish(Bool(True))
air_conditioner.py 文件源码 项目:aide 作者: Lambda-3 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def decrease_performance(room):
    """
    Decreases the performance of the AC adaptor in a given room by one step.


    :param room: ID of the room
    :type room: str
    :return:
    """
    __pub.publish(Bool(False))
jaco_signals.py 文件源码 项目:cu-perception-manipulation-stack 作者: correlllab 项目源码 文件源码 阅读 45 收藏 0 点赞 0 评论 0
def __init__(self):
        self.objectDet = [False] * 3
        self.sai_calibration = [None] * 3
        self.current_fingers_touch = [False]*3
        self.calibrate = False
        self.calibrate_vals = [deque(maxlen=100),deque(maxlen=100),deque(maxlen=100)]

        self.object_det_calibrated_pub = rospy.Publisher("/finger_sensor/obj_det_calibrated", Bool, queue_size=1)
        self.calibrate_obj_det_sub = rospy.Subscriber("/finger_sensor/calibrate_obj_det", Bool, self.set_calibrate)
        self.fai_sub = rospy.Subscriber("/finger_sensor/fai", FingerFAI, self.detect_touch)
        self.sai_sub = rospy.Subscriber("/finger_sensor/sai", FingerSAI, self.detect_object)
        self.object_det_pub = rospy.Publisher("/finger_sensor/obj_detected", FingerDetect, queue_size=1)
        self.touch_pub = rospy.Publisher("/finger_sensor/touch", FingerTouch, queue_size=1)
manager.py 文件源码 项目:cu-perception-manipulation-stack 作者: correlllab 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _calibrate(self):
        self.state = "calibrate"
        self.alignment_pub.publish(Bool(True))
manager.py 文件源码 项目:cu-perception-manipulation-stack 作者: correlllab 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _perceive(self):
        self.state = "perceive"
        rospy.loginfo("Asking for perception...")
        self.perception_pub.publish(Bool(True))
manager.py 文件源码 项目:cu-perception-manipulation-stack 作者: correlllab 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _stop_perceive(self):
        self.state = "perceive"
        self.perception_pub.publish(Bool(False))
action_database.py 文件源码 项目:cu-perception-manipulation-stack 作者: correlllab 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __init__(self,object_list,perception_time=2.5):
        smach.State.__init__(self, outcomes=['objects_found', 'objects_not_found'],
                            output_keys=['pick_obj_name_out','place_obj_name_out'])

        self.perception_pub = rospy.Publisher("/perception/enabled",
                                              Bool,
                                              queue_size=1)

        self.listen = tf.TransformListener()
        self.perception_time = perception_time
        self.object_list = object_list
action_database.py 文件源码 项目:cu-perception-manipulation-stack 作者: correlllab 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __init__(self,pos=None):
        smach.State.__init__(self, outcomes=['calibrated','not_calibrated'])
        self.calibrate_pub = rospy.Publisher('/finger_sensor/calibrate_obj_det',
                                            Bool,
                                            queue_size=1)
        self.calibrated_sub = rospy.Subscriber('/finger_sensor/obj_det_calibrated',
                                                Bool,
                                                self.set_calibrated)
        self.calibrated = None
        self.finger_pos = pos
        self.jgn = JacoGripper()
perception.py 文件源码 项目:cu-perception-manipulation-stack 作者: correlllab 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self):
        super(PerceptionNode, self).__init__('p_node')

        self.transition_table = {
            # If calibration has already happened, allow skipping it
            'initial': {'q': self._perceive},
            'perceive': {'q': self._post_perceive},
            }

        # Hardcoded place for now
        self.tl = tf.TransformListener()
        self.num_objects = 0
        # Would this work too? Both tf and tf2 have (c) 2008...
        # self.tf2 = tf2_ros.TransformListener()
        self.n_objects_sub = rospy.Subscriber("/num_objects", Int64,
                                              self.update_num_objects,
                                              queue_size=1)
        self.perception_pub = rospy.Publisher("/perception/enabled",
                                              Bool,
                                              queue_size=1)

        self.br = tf.TransformBroadcaster()

        # Dict to map imarker names and their updated poses
        self.int_markers = {}

        # Ideally this call would be in a Factory/Metaclass/Parent
        self.show_options()
perception.py 文件源码 项目:cu-perception-manipulation-stack 作者: correlllab 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _perceive(self):
        self.state = "perceive"
        rospy.loginfo("Asking for perception...")
        self.perception_pub.publish(Bool(True))
ergo.py 文件源码 项目:APEX 作者: ymollard 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __init__(self):
        self.rospack = RosPack()
        with open(join(self.rospack.get_path('apex_playground'), 'config', 'ergo.json')) as f:
            self.params = json.load(f)
        self.button = Button(self.params)
        self.rate = rospy.Rate(self.params['publish_rate'])
        self.button.switch_led(False)

        # Service callers
        self.robot_reach_srv_name = '{}/reach'.format(self.params['robot_name'])
        self.robot_compliant_srv_name = '{}/set_compliant'.format(self.params['robot_name'])
        rospy.loginfo("Ergo node is waiting for poppy controllers...")
        rospy.wait_for_service(self.robot_reach_srv_name)
        rospy.wait_for_service(self.robot_compliant_srv_name)
        self.reach_proxy = rospy.ServiceProxy(self.robot_reach_srv_name, ReachTarget)
        self.compliant_proxy = rospy.ServiceProxy(self.robot_compliant_srv_name, SetCompliant)
        rospy.loginfo("Controllers connected!")

        self.state_pub = rospy.Publisher('ergo/state', CircularState, queue_size=1)
        self.button_pub = rospy.Publisher('sensors/buttons/help', Bool, queue_size=1)

        self.goals = []
        self.goal = 0.
        self.joy_x = 0.
        self.joy_y = 0.
        self.motion_started_joy = 0.
        self.js = JointState()
        rospy.Subscriber('sensors/joystick/{}'.format(self.params["control_joystick_id"]), Joy, self.cb_joy)
        rospy.Subscriber('{}/joint_state'.format(self.params['robot_name']), JointState, self.cb_js)
        rospy.Subscriber('sensors/button_leds/pause', Bool, self.cb_bt_led)

        self.t = rospy.Time.now()
        self.srv_reset = None
        self.extended = False
        self.standby = False
        self.last_activity = rospy.Time.now()
        self.delta_t = rospy.Time.now()
ergo.py 文件源码 项目:APEX 作者: ymollard 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def publish_button(self):
        self.button_pub.publish(Bool(data=self.button.pressed))
learning.py 文件源码 项目:APEX 作者: ymollard 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def publish(self):
        if self.learning is not None:
            with self.lock_iteration:
                focus = copy(self.focus)
                ready = copy(self.ready_for_interaction)

            self.pub_ready.publish(Bool(data=ready))
            self.pub_user_focus.publish(String(data=focus if focus is not None else ""))
            self.pub_focus.publish(String(data=self.learning.get_last_focus()))
            self.pub_iteration.publish(UInt32(data=self.learning.get_iterations()))


    ################################# Service callbacks
perception.py 文件源码 项目:APEX 作者: ymollard 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def record(self, human_demo, nb_points):
        call = self.services['record']['call']
        return call(RecordRequest(human_demo=Bool(data=human_demo), nb_points=UInt8(data=nb_points)))


问题


面经


文章

微信
公众号

扫码关注公众号