python类Bool()的实例源码

action_manager.py 文件源码 项目:RobotLearning 作者: AmiiThinks 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def run(self):
        """Send an action at a 40Hz cycle."""
        rospy.init_node('action_manager', anonymous=True)
        rospy.Subscriber('action_cmd', Twist, self.update_action)
        rospy.Subscriber('termination', Bool, self.set_termination_flag)
        rospy.Subscriber('pause', Bool, self.set_pause_flag)

        action_publisher = rospy.Publisher('cmd_vel_mux/input/teleop',
                                           Twist,
                                           queue_size=1)

        action_pub_rate = rospy.Rate(40)

        while not rospy.is_shutdown():
            if self.termination_flag:
                break
            if self.pause_flag is False:
                # log action
                speeds = (self.action.linear.x, self.action.angular.z)
                actn = "linear: {}, angular: {}".format(*speeds)
                rospy.logdebug("Sending action to Turtlebot: {}".format(actn))

                # send new actions
                if self.stop_once:
                    action_publisher.publish(self.STOP_ACTION)
                    self.stop_once = False
                else:
                    action_publisher.publish(self.action)
            action_pub_rate.sleep()
light.py 文件源码 项目:arm_scenario_simulator 作者: cmaestre 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def publish_state(self):
        pub = rospy.Publisher('/env/'+self.name+'/lamp/visual/get_state', Bool, queue_size=1)
        rate = rospy.Rate(50) # 50hz
        while not rospy.is_shutdown():
            pub.publish(self._on)
            rate.sleep()
camdector.py 文件源码 项目:nimo 作者: wolfram2012 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def detect_and_visualize(self, root_dir=None, extension=None,
                             classes=[], thresh=0.6, show_timer=False):

        global imgi
        global img_rect
        global Frame
        global show
        global trackpos
        global imshow
        global acc_pub
        global acc_enable

        acc_pub = rospy.Publisher("acc_cmd", ACCCommand, queue_size=2)
        acc_enable = rospy.Publisher("acc_enable", Bool, queue_size=2)
        # rospy.Timer(rospy.Duration(0.02), self.trackCallback)
        rospy.Subscriber("/zed/left/image_rect_color/compressed",CompressedImage, self.ImgCcallback,  queue_size = 4)
        # rospy.Subscriber("/zed/left/image_rect_color",Image, self.Imgcallback,  queue_size = 4)
        rospy.Subscriber("/zed/depth/depth_registered",Image, self.DepthImgcallback,  queue_size = 4)
        im_path = '/home/wolfram/mxnet/example/ssd/data/demo/dog.jpg'
        with open(im_path, 'rb') as fp:
            img_content = fp.read()

        trackpos = 0
        imshow = 0
        imgi = mx.img.imdecode(img_content)
        while(1):

            # ret,img_rect = cap.read()
            dets = self.im_detect(root_dir, extension, show_timer=show_timer)
            # if not isinstance(im_list, list):
            #     im_list = [im_list]
            # assert len(dets) == len(im_list)
            # for k, det in enumerate(dets):

            # img[:, :, (0, 1, 2)] = img[:, :, (2, 1, 0)]
            # img = img_rect.copy()
            self.visualize_detection(img_rect, dets[0], classes, thresh)

            if imshow == 1:
                cv2.imshow("tester", show)

            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
        cap.release()
        cv2.destroyAllWindows()
motion_arbiter.py 文件源码 项目:mhri 作者: mhri 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __init__(self):
        self.is_rendering = False

        rospy.loginfo('\033[91m[%s]\033[0m waiting for bringup social_mind...'%rospy.get_name())
        self.rd_memory = {}
        try:
            rospy.wait_for_service('social_events_memory/read_data')
            self.rd_memory['social_events_memory'] = rospy.ServiceProxy('social_events_memory/read_data', ReadData)
            rospy.wait_for_service('environmental_memory/read_data')
            self.rd_memory['environmental_memory'] = rospy.ServiceProxy('environmental_memory/read_data', ReadData)
            rospy.wait_for_service('system_events_memory/read_data')
            self.rd_memory['system_events_memory'] = rospy.ServiceProxy('system_events_memory/read_data', ReadData)
        except rospy.exceptions.ROSInterruptException as e:
            rospy.logerr(e)
            quit()

        self.renderer_client = actionlib.SimpleActionClient('render_scene', RenderSceneAction)
        rospy.loginfo('\033[91m[%s]\033[0m waiting for motion_renderer to start...'%rospy.get_name())

        try:
            self.renderer_client.wait_for_server()
        except rospy.exceptions.ROSInterruptException as e:
            quit()

        rospy.Subscriber('reply', Reply, self.handle_domain_reply)
        self.pub_log_item = rospy.Publisher('log', LogItem, queue_size=10)

        self.pub_start_speech_recognizer = rospy.Publisher('speech_recognizer/start', Empty, queue_size=1)
        self.pub_stop_speech_recognizer = rospy.Publisher('speech_recognizer/stop', Empty, queue_size=1)

        self.pub_start_robot_speech = rospy.Publisher('robot_speech/start', Empty, queue_size=1)
        self.pub_stop_robot_speech = rospy.Publisher('robot_speech/stop', Empty, queue_size=1)

        self.is_speaking_started = False
        rospy.Subscriber('start_of_speech', Empty, self.handle_start_of_speech)
        rospy.Subscriber('end_of_speech', Empty, self.handle_end_of_speech)

        # rospy.Subscriber('emotion_status', EmotionStatus, self.handle_emotion_status, queue_size=10)
        # self.current_emotion = 'neutral'
        # self.current_emotion_intensity = 1.0

        self.pub_set_idle_motion = rospy.Publisher('idle_motion/set_enabled', Bool, queue_size=10)
        self.pub_set_idle_motion.publish(True)

        self.scene_queue = Queue.Queue(MAX_QUEUE_SIZE)
        self.scene_handle_thread = Thread(target=self.handle_scene_queue)
        self.scene_handle_thread.start()

        rospy.loginfo("\033[91m[%s]\033[0m initialized." % rospy.get_name())
contact.py 文件源码 项目:spqrel_tools 作者: LCAS 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def __init__(self, moduleName):
        # get connection from command line:
        from optparse import OptionParser

        parser = OptionParser()
        parser.add_option("--ip", dest="ip", default="",
                          help="IP/hostname of broker. Default is system's default IP address.", metavar="IP")
        parser.add_option("--port", dest="port", default=0,
                          help="IP/hostname of broker. Default is automatic port.", metavar="PORT")
        parser.add_option("--pip", dest="pip", default="127.0.0.1",
                          help="IP/hostname of parent broker. Default is 127.0.0.1.", metavar="IP")
        parser.add_option("--pport", dest="pport", default=9559,
                          help="port of parent broker. Default is 9559.", metavar="PORT")

        (options, args) = parser.parse_args()
        self.ip = options.ip
        self.port = int(options.port)
        self.pip = options.pip
        self.pport = int(options.pport)
        self.moduleName = moduleName

        self.init_almodule()

        # ROS initialization:
        rospy.init_node('naoqi_tactile')

        # init. messages:
        self.tactile = TactileTouch()              
        self.bumper = Bumper()
        self.tactilePub = rospy.Publisher("tactile_touch", TactileTouch, queue_size=1)
        self.bumperPub = rospy.Publisher("bumper", Bumper, queue_size=1)

        try:
            footContact = self.memProxy.getData("footContact", 0)
        except RuntimeError:
            footContact = None

        if footContact is None:
            self.hasFootContactKey = False
            rospy.loginfo("Foot contact key is not present in ALMemory, will not publish to foot_contact topic.")
        else:
            self.hasFootContactKey = True
            self.footContactPub = rospy.Publisher("foot_contact", Bool, latch=True, queue_size=1)
            self.footContactPub.publish(footContact > 0.0)

        # constants in TactileTouch and Bumper will not be available in callback functions
        # as they are executed in the parent broker context (i.e. on robot),
        # so they have to be copied to member variables
        self.tactileTouchFrontButton = TactileTouch.buttonFront;
        self.tactileTouchMiddleButton = TactileTouch.buttonMiddle;
        self.tactileTouchRearButton = TactileTouch.buttonRear;
        self.bumperRightButton = Bumper.right;
        self.bumperLeftButton = Bumper.left;

        self.subscribe()

        rospy.loginfo("nao_tactile initialized")
manager.py 文件源码 项目:cu-perception-manipulation-stack 作者: correlllab 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(self, robot, *robotargs):
        super(PickAndPlaceNode, self).__init__('pp_node')
    rospy.loginfo("PickAndPlaceNode")
        _post_perceive_trans = defaultdict(lambda: self._pick)
        _post_perceive_trans.update({'c': self._calibrate})
        _preplace = defaultdict(lambda: self._preplace)
        self.transition_table = {
            # If calibration has already happened, allow skipping it
            'initial': {'c': self._calibrate, 'q': self._perceive,
                        's': self._preplace},
            'calibrate': {'q': self._perceive, 'c': self._calibrate},
            'perceive': {'p': self._post_perceive, 'q': self._perceive, 's': self._stop_perceive, 'c': self._calibrate},
            'post_perceive': _post_perceive_trans,
            'postpick': {'1': self._level, '2': self._level, '9': self._level},
            'level': _preplace,
            'preplace': {'s': self._place},
            'place': {'q': self._perceive, 'c': self._calibrate}
            }
    rospy.loginfo("PickAndPlaceNode1")
        if callable(robot):
            self.robot = robot(*robotargs)
        else:
            self.robot = robot
        self.robot.level = 1
    rospy.loginfo("PickAndPlaceNode2")
        # Hardcoded place for now
        self.place_pose = PoseStamped(
            Header(0, rospy.Time(0), self.robot.base),
            Pose(Point(0.526025806, 0.4780144, -0.161326153),
                 Quaternion(1, 0, 0, 0)))
        self.tl = tf.TransformListener()
        self.num_objects = 0
        # Would this work too? Both tf and tf2 have (c) 2008...
        # self.tf2 = tf2_ros.TransformListener()
        self.n_objects_sub = rospy.Subscriber("/num_objects", Int64,
                                              self.update_num_objects,
                                              queue_size=1)
        self.perception_pub = rospy.Publisher("/perception/enabled",
                                              Bool,
                                              queue_size=1)
        self.alignment_pub = rospy.Publisher("/alignment/doit",
                                             Bool,
                                             queue_size=1)
        self.br = tf.TransformBroadcaster()
    rospy.loginfo("PickAndPlaceNode3")
        self.int_marker_server = InteractiveMarkerServer('int_markers')
        # Dict to map imarker names and their updated poses
        self.int_markers = {}

        # Ideally this call would be in a Factory/Metaclass/Parent
        self.show_options()
        self.perceive = False
        # self.robot.home()
        # self.move_calib_position()
learning.py 文件源码 项目:APEX 作者: ymollard 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def __init__(self):
        self.rospack = RosPack()
        with open(join(self.rospack.get_path('apex_playground'), 'config', 'learning.json')) as f:
            self.params = json.load(f)

        self.translator = EnvironmentTranslator()
        self.learning = None

        # User control and critical resources
        self.lock_iteration = RLock()
        self.ready_for_interaction = True
        self.focus = None
        self.set_iteration = -1

        # Saved experiment files
        self.dir = join(self.rospack.get_path('apex_playground'), 'logs')
        if not os.path.isdir(self.dir):
            os.makedirs(self.dir)
        # self.stamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
        # self.source_file = join(self.dir, self.source_name + '.pickle')
        self.main_experiment = True
        self.condition = ""
        self.trial = ""
        self.task = ""

        # Serving these services
        self.service_name_perceive = "learning/perceive"
        self.service_name_produce = "learning/produce"
        self.service_name_set_interest = "learning/set_interest"
        self.service_name_set_iteration = "learning/set_iteration"
        self.service_name_interests = "learning/get_interests"

        # Publishing these topics
        self.pub_focus = rospy.Publisher('learning/current_focus', String, queue_size=1, latch=True)
        self.pub_user_focus = rospy.Publisher('learning/user_focus', String, queue_size=1, latch=True)
        self.pub_ready = rospy.Publisher('learning/ready_for_interaction', Bool, queue_size=1, latch=True)
        self.pub_iteration = rospy.Publisher('iteration', UInt32, queue_size=1, latch=True)

        # Using these services
        self.service_name_get_perception = "perception/get"
        for service in [self.service_name_get_perception]:
            rospy.loginfo("Learning  node is waiting service {}...".format(service))
            rospy.wait_for_service(service)
        self.get_state = rospy.ServiceProxy(self.service_name_get_perception, GetSensorialState)


问题


面经


文章

微信
公众号

扫码关注公众号