python类Rate()的实例源码

2017_Task1.py 文件源码 项目:cu-perception-manipulation-stack 作者: correlllab 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def goto_cup(self):
        # self.calibrate_obj_det_pub.publish(True)
        #
        # while self.calibrated == False:
        #     pass
        #
        # print("Finger Sensors calibrated")
        rate = rospy.Rate(100)
        while not rospy.is_shutdown():
            try:
                trans = self.tfBuffer.lookup_transform('root', 'teaCup_position', rospy.Time())
                break
            except (tf2_ros.LookupException, tf2_ros.ConnectivityException, tf2_ros.ExtrapolationException):
                rate.sleep()
            # continue

        translation  = [trans.transform.translation.x, trans.transform.translation.y, trans.transform.translation.z]
        rotation = [trans.transform.rotation.x, trans.transform.rotation.y, trans.transform.rotation.z, trans.transform.rotation.w]
        pose_value = translation + rotation
        #second arg=0 (absolute movement), arg = '-r' (relative movement)
        self.cmmnd_CartesianPosition(pose_value, 0)
2017_Task5.py 文件源码 项目:cu-perception-manipulation-stack 作者: correlllab 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def cmmnd_makeContact_USBPlug(self, sensitivity):
        rate = rospy.Rate(100)
        while (self.bump_finger_1 < sensitivity)and not rospy.is_shutdown():
            print (self.bump_finger_1)
            self.cmmnd_CartesianVelocity([-0.03,0,0,0,0,0,1])
            rate.sleep()
        print ("contact made with the ground")

    # def pick_USBlight_1(self, current_finger_position):
    #     ii = 0
    #     rate = rospy.Rate(100)
    #     while self.touch_finger_3 != True and not rospy.is_shutdown():
    #         current_finger_position[0] += 5 # slowly close finger_1 until contact is made
    #         print (current_finger_position[0])
    #         self.cmmnd_FingerPosition([current_finger_position[0], current_finger_position[1], current_finger_position[2]])
    #         rate.sleep()
    #     self.touch_finger_1 = False
    #     return current_finger_position
2017_Task7.py 文件源码 项目:cu-perception-manipulation-stack 作者: correlllab 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def cmmd_touchBlock(self,current_finger_position):
        ii = 0
        rate = rospy.Rate(100)
        while self.touch_finger_1 != True and not rospy.is_shutdown():
            current_finger_position[0] += 5 # slowly close finger_1 until contact is made
            # print (current_finger_position[0])
            self.cmmnd_FingerPosition([current_finger_position[0], current_finger_position[1], current_finger_position[2]])
            rate.sleep()
        self.touch_finger_1 = False

        while self.touch_finger_2 != True and not rospy.is_shutdown():
            current_finger_position[1] += 5 # slowly close finger_1 until contact is made
            # print (current_finger_position[0])
            self.cmmnd_FingerPosition([current_finger_position[0], current_finger_position[1], current_finger_position[2]])
            rate.sleep()
        self.touch_finger_2 = False

        return current_finger_position
2017_Task3.py 文件源码 项目:cu-perception-manipulation-stack 作者: correlllab 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def lift_spoon(self):

        rate = rospy.Rate(100) # NOTE to publish cmmds to velocity_pub at 100Hz
        while self.touch_finger_3 != True:
            self.cmmnd_CartesianVelocity([0,0.025,0,0,0,0,1])
            rate.sleep()
        self.touch_finger_3 = False

        # p.cmmnd_CartesianPosition([0.02,0,0,0,0,0,1],'-r')

        self.cmmnd_FingerPosition([0, 0, 60])
        self.cmmnd_FingerPosition([100, 0, 60])
        self.cmmnd_FingerPosition([100, 0, 100])
        self.cmmnd_FingerPosition([100, 100, 100])
        self.cmmnd_CartesianPosition([0, 0, 0.13, 0, 0, 0, 1],'-r')
        # self.cmmnd_FingerPosition([100, 100, 100])
task_5.py 文件源码 项目:cu-perception-manipulation-stack 作者: correlllab 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def search_USBlight(self):
        if self.listen.frameExists("/j2n6a300_end_effector") and self.listen.frameExists("/root"):
            # print ("we are in the search spoon fucntion")
            self.listen.waitForTransform('/j2n6a300_end_effector','/root',rospy.Time(),rospy.Duration(100.0))
            t = self.listen.getLatestCommonTime("/j2n6a300_end_effector","/root")
            translation, quaternion = self.listen.lookupTransform("/j2n6a300_end_effector","/root",t)
            matrix1=self.listen.fromTranslationRotation(translation,quaternion)
            counter=0
            rate=rospy.Rate(100)
            while not self.obj_det:
                  counter = counter + 1
                  if(counter < 200):
                    cart_velocities = np.dot(matrix1[:3,:3],np.array([0,0,0.05])[np.newaxis].T) #change in y->x, z->y, x->z
                    cart_velocities = cart_velocities.T[0].tolist()
                    self.cmmnd_CartesianVelocity(cart_velocities + [0,0,0,1])
                  else:
                    cart_velocities = np.dot(matrix1[:3,:3],np.array([0,0,-0.05])[np.newaxis].T)
                    cart_velocities = cart_velocities.T[0].tolist()
                    self.cmmnd_CartesianVelocity(cart_velocities + [0,0,0,1])
                  rate.sleep()
                  if(counter >400):
                     counter=0
task_7.py 文件源码 项目:cu-perception-manipulation-stack 作者: correlllab 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def search_straw(self):
        if self.listen.frameExists("/j2n6a300_end_effector") and self.listen.frameExists("/root"):
            # print ("we are in the search spoon fucntion")
            self.listen.waitForTransform('/j2n6a300_end_effector','/root',rospy.Time(),rospy.Duration(100.0))
            t = self.listen.getLatestCommonTime("/j2n6a300_end_effector","/root")
            translation, quaternion = self.listen.lookupTransform("/j2n6a300_end_effector","/root",t)
            matrix1=self.listen.fromTranslationRotation(translation,quaternion)
            counter=0
            rate=rospy.Rate(100)
            while not self.obj_det:
                  counter = counter + 1
                  if(counter < 200):
                    cart_velocities = np.dot(matrix1[:3,:3],np.array([00,0,0.05])[np.newaxis].T) #change in y->x, z->y, x->z
                    cart_velocities = cart_velocities.T[0].tolist()
                    self.cmmnd_CartesianVelocity(cart_velocities + [0,0,0,1])
                  else:
                    cart_velocities = np.dot(matrix1[:3,:3],np.array([0.0,0,-.05])[np.newaxis].T)
                    cart_velocities = cart_velocities.T[0].tolist()
                    self.cmmnd_CartesianVelocity(cart_velocities + [0,0,0,1])
                  rate.sleep()
                  if(counter >400):
                     counter=0
task_3.py 文件源码 项目:cu-perception-manipulation-stack 作者: correlllab 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def searchSpoon(self):
        if self.listen.frameExists("/j2n6a300_end_effector") and self.listen.frameExists("/root"):
            # print ("we are in the search spoon fucntion")
            self.listen.waitForTransform('/j2n6a300_end_effector','/root',rospy.Time(),rospy.Duration(100.0))
            t = self.listen.getLatestCommonTime("/j2n6a300_end_effector","/root")
            translation, quaternion = self.listen.lookupTransform("/j2n6a300_end_effector","/root",t)
            matrix1=self.listen.fromTranslationRotation(translation,quaternion)
            counter=0
            rate=rospy.Rate(100)
            while not self.obj_det:
                  counter = counter + 1
                  if(counter < 200):
                    cart_velocities = np.dot(matrix1[:3,:3],np.array([0,0,0.05])[np.newaxis].T) #change in y->x, z->y, x->z
                    cart_velocities = cart_velocities.T[0].tolist()
                    self.cmmnd_CartesianVelocity(cart_velocities + [0,0,0,1])
                  else:
                    cart_velocities = np.dot(matrix1[:3,:3],np.array([0,0,-0.05])[np.newaxis].T)
                    cart_velocities = cart_velocities.T[0].tolist()
                    self.cmmnd_CartesianVelocity(cart_velocities + [0,0,0,1])
                  rate.sleep()
                  if(counter >400):
                     counter=0
action_database.py 文件源码 项目:cu-perception-manipulation-stack 作者: correlllab 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def back_forth_search_xy(self):
        rate = rospy.Rate(100)
        for i in range(100):
            if np.all(np.array(self.finger_detect) == np.array(self.detect_goal)):
                return 'found'
            msg = self.create_pose_velocity_msg([0.0,-0.05,0.0,0.0,0.0,0.0])
            self.jn.kinematic_control(msg)
            rate.sleep()
        for i in range(200):
            if np.all(np.array(self.finger_detect)[[self.fingers]] == np.array(self.detect_goal)):
                return 'found'
            msg = self.create_pose_velocity_msg([0.0,0.05,0.0,0.0,0.0,0.0])
            self.jn.kinematic_control(msg)
            rate.sleep()
        for i in range(100):
            if np.all(np.array(self.finger_detect) == np.array(self.detect_goal)):
                return 'found'
            msg = self.create_pose_velocity_msg([0.0,-0.05,0.0,0.0,0.0,0.0])
            self.jn.kinematic_control(msg)
            rate.sleep()
        return 'not_found'
issue_pos.py 文件源码 项目:gps 作者: cbfinn 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def main():
    rospy.init_node('issue_com')
    pub = rospy.Publisher(POS_COM_TOPIC, PositionCommand, queue_size=10)
    test_pub = rospy.Publisher(TEST_TOPIC, Empty, queue_size=10)
    sub = rospy.Subscriber(POS_COM_TOPIC, PositionCommand, listen)
    #sub = rospy.Subscriber('/joint_states', JointState, listen)

    pc = PositionCommand()
    pc.mode = JOINT_SPACE
    #pc.arm = PositionCommand.LEFT_ARM
    pc.arm = 1#PositionCommand.RIGHT_ARM
    pc.data = np.zeros(7)

    r = rospy.Rate(1)
    #while not rospy.is_shutdown():
    #    pub.publish(pc)
    #    r.sleep()
    #    print 'published!'
    r.sleep()
    test_pub.publish(Empty())
    pub.publish(pc)
get_hand_gestures.py 文件源码 项目:tbotnav 作者: patilnabhi 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def depth_callback(self, ros_image):
        try:
            inImg = self.bridge.imgmsg_to_cv2(ros_image)
        except CvBridgeError, e:
            print e

        inImgarr = np.array(inImg, dtype=np.uint16)
        # inImgarr = cv2.GaussianBlur(inImgarr, (3, 3), 0)
        # cv2.normalize(inImgarr, inImgarr, 0, 1, cv2.NORM_MINMAX) 

        self.outImg, self.num_fingers = self.process_depth_image(inImgarr) 
        # outImg = self.process_depth_image(inImgarr) 
        # rate = rospy.Rate(10)        
        self.num_pub.publish(self.num_fingers)
        # self.img_pub.publish(self.bridge.cv2_to_imgmsg(self.outImg, "bgr8"))
        # rate.sleep()

        cv2.imshow("Hand Gesture Recognition", self.outImg)
        cv2.waitKey(3)
hardware_joystick_publisher.py 文件源码 项目:APEX 作者: ymollard 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __init__(self):
        self.joy_pub = rospy.Publisher('sensors/joystick/1', Joy, queue_size=1)
        self.joy_pub2 = rospy.Publisher('sensors/joystick/2', Joy, queue_size=1)

        self.rospack = RosPack()

        self.rate = rospy.Rate(20)

        count = len(devices.gamepads)

        if count < 2:
            rospy.logerr("Sensors: Expecting 2 joysticks but found only {}, exiting".format(count))
            sys.exit(-1)

        gamepads = sorted(devices.gamepads, key=lambda pad: int(str(pad.name)[-1]))
        rospy.loginfo(gamepads)
        self.joysticks = [JoystickThread(pad) for pad in gamepads]
        [joystick.start() for joystick in self.joysticks]
environment.py 文件源码 项目:APEX 作者: ymollard 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __init__(self):
        # Load parameters and hack the tuple conversions so that OpenCV is happy
        self.rospack = RosPack()
        with open(join(self.rospack.get_path('apex_playground'), 'config', 'environment.json')) as f:
            self.params = json.load(f)
        self.params['tracking']['ball']['lower'] = tuple(self.params['tracking']['ball']['lower'])
        self.params['tracking']['ball']['upper'] = tuple(self.params['tracking']['ball']['upper'])
        self.params['tracking']['arena']['lower'] = tuple(self.params['tracking']['arena']['lower'])
        self.params['tracking']['arena']['upper'] = tuple(self.params['tracking']['arena']['upper'])

        self.tracking = BallTracking(self.params)
        self.conversions = EnvironmentConversions()
        self.ball_pub = rospy.Publisher('environment/ball', CircularState, queue_size=1)
        self.light_pub = rospy.Publisher('environment/light', UInt8, queue_size=1)
        self.image_pub = rospy.Publisher('environment/image', Float32MultiArray, queue_size=1)
        self.sound_pub = rospy.Publisher('environment/sound', Float32, queue_size=1)
        self.rate = rospy.Rate(self.params['rate'])
torso.py 文件源码 项目:APEX 作者: ymollard 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def __init__(self):
        self.rospack = RosPack()
        with open(join(self.rospack.get_path('apex_playground'), 'config', 'torso.json')) as f:
            self.params = json.load(f)

        self.publish_rate = rospy.Rate(self.params['publish_rate'])
        self.demo = rospy.get_param('demo_mode')

        # Protected resources
        self.in_rest_pose = False
        self.robot_lock = RLock()

        # Used services
        self.torso = TorsoServices(self.params['robot_name'])

        # Proposed services
        self.reset_srv_name = 'torso/reset'
        self.reset_srv = None
perception.py 文件源码 项目:APEX 作者: ymollard 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def wait_for_human_interaction(self, arm_threshold=1, joystick_threshold=0.15):
        rospy.loginfo("We are waiting for human interaction...")

        def detect_arm_variation():
            new_effort = np.array(self.topics.torso_l_j.effort)
            delta = np.absolute(effort - new_effort)
            return np.amax(delta) > arm_threshold

        def detect_joy_variation():
            return np.amax(np.abs(self.topics.joy1.axes)) > joystick_threshold

        effort = np.array(self.topics.torso_l_j.effort)
        rate = rospy.Rate(50)
        is_joystick_demo = None
        while not rospy.is_shutdown():
            if detect_arm_variation():
                is_joystick_demo = False
                break
            elif detect_joy_variation():
                is_joystick_demo = True
                break
            rate.sleep()
        return is_joystick_demo

    ################################# Service callbacks
issue_pos.py 文件源码 项目:lsdc 作者: febert 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def main():
    rospy.init_node('issue_com')
    pub = rospy.Publisher(POS_COM_TOPIC, PositionCommand, queue_size=10)
    test_pub = rospy.Publisher(TEST_TOPIC, Empty, queue_size=10)
    sub = rospy.Subscriber(POS_COM_TOPIC, PositionCommand, listen)
    #sub = rospy.Subscriber('/joint_states', JointState, listen)

    pc = PositionCommand()
    pc.mode = JOINT_SPACE
    #pc.arm = PositionCommand.LEFT_ARM
    pc.arm = 1#PositionCommand.RIGHT_ARM
    pc.data = np.zeros(7)

    r = rospy.Rate(1)
    #while not rospy.is_shutdown():
    #    pub.publish(pc)
    #    r.sleep()
    #    print 'published!'
    r.sleep()
    test_pub.publish(Empty())
    pub.publish(pc)
client.py 文件源码 项目:ros-interop 作者: mcgill-robotics 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def wait_for_server(self):
        """Waits until interoperability server is reachable."""
        reachable = False
        rate = rospy.Rate(1)
        while not reachable and not rospy.is_shutdown():
            try:
                response = requests.get(
                    self.url, timeout=self.timeout, verify=self.verify)
                response.raise_for_status()
                reachable = response.ok
            except requests.ConnectionError:
                rospy.logwarn_throttle(5.0, "Waiting for server: {}".format(
                    self.url))
            except Exception as e:
                rospy.logerr_throttle(
                    5.0, "Unexpected error waiting for server: {}, {}".format(
                        self.url, e))

            if not reachable:
                rate.sleep()
master_sync.py 文件源码 项目:multimaster_udp 作者: AlexisTM 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def spin(self):
        # @todo: is this excessively hitting the master?
        r = rospy.Rate(10.0)

        while not rospy.is_shutdown():
            for srv in self._local_services:
                srv_uri = self._local_manager.lookup_service(srv)
                if srv_uri:
                    self._foreign_manager.advertise_service(srv, srv_uri)
                else:
                    self._foreign_manager.unadvertise_service(srv)

            for srv in self._foreign_services:
                srv_uri = self._foreign_manager.lookup_service(srv)
                if srv_uri:
                    self._local_manager.advertise_service(srv, srv_uri)
                else:
                    self._local_manager.unadvertise_service(srv)

            r.sleep()

        if self._local_manager:
            self._local_manager.unsubscribe_all()
        if self._foreign_manager:
            self._foreign_manager.unsubscribe_all()
operate_recognize_object.py 文件源码 项目:xm_strategy 作者: xm-project 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def __init__(self,dis=0.0):
        State.__init__(self, outcomes=['succeeded','preempted','aborted'],input_keys=['target'])
        rospy.on_shutdown(self.shutdown)
        self.test_distance = target.y-dis
        self.rate = 200
        self.r = rospy.Rate(self.rate)
        self.speed = rospy.get_param('~speed',0.08)
        self.tolerance = rospy.get_param('~tolerance', 0.01)
        self.cmd_vel = rospy.Publisher('/mobile_base_controller/smooth_cmd_vel',Twist,queue_size=5)
        self.base_frame = rospy.get_param('~base_frame', '/base_link')
        self.odom_frame = rospy.get_param('~odom_frame', '/odom')
        self.tf_listener = tf.TransformListener()
        rospy.sleep(1)
        self.tf_listener.waitForTransform(self.odom_frame,self.base_frame,rospy.Time(),rospy.Duration(60.0))
        #define a bianliang
        self.flag = rospy.get_param('~flag', True)
        rospy.loginfo(self.test_distance)
        #tf get position

        #publish cmd_vel
operate_recognize_object.py 文件源码 项目:xm_strategy 作者: xm-project 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def __init__(self,angle=0):
        State.__init__(self, outcomes=['succeeded','aborted','preempted'],input_keys=["angle_in"])
        self.speed = rospy.get_param('~speed', 0.10)
        self.secretindigal = 1.0
        self.tolerance = rospy.get_param('tolerance', math.radians(5))
        self.start = True
        self.cmd_vel = rospy.Publisher('mobile_base_controller/smooth_cmd_vel', Twist, queue_size=5)
        self.base_frame = rospy.get_param('~base_frame', '/base_link')
        self.odom_frame = rospy.get_param('~odom_frame', '/odom')
        rospy.on_shutdown(self.shutdown)
        self.rate = 30
        self.start_test = True
        self.r = rospy.Rate(self.rate)

        self.tf_listener = tf.TransformListener()
        rospy.sleep(0.5)
        self.tf_listener.waitForTransform(self.odom_frame, self.base_frame, rospy.Time(), rospy.Duration(60))
object_smach.py 文件源码 项目:xm_strategy 作者: xm-project 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __init__(self, angle=0):
        State.__init__(self, outcomes=['succeeded', 'aborted', 'preempted'], input_keys=["angle_in"])
        self.speed = rospy.get_param('~speed', 0.03)
        self.secretindigal = 1.0
        self.tolerance = rospy.get_param('tolerance', math.radians(5))
        self.start = True
        self.cmd_vel = rospy.Publisher('mobile_base_controller/smooth_cmd_vel', Twist, queue_size=5)
        self.base_frame = rospy.get_param('~base_frame', '/base_link')
        self.odom_frame = rospy.get_param('~odom_frame', '/odom')
        rospy.on_shutdown(self.shutdown)
        self.rate = 30
        self.start_test = True
        self.r = rospy.Rate(self.rate)
        self.angle=angle
        self.tf_listener = tf.TransformListener()
        rospy.sleep(0.5)
        self.tf_listener.waitForTransform(self.odom_frame, self.base_frame, rospy.Time(), rospy.Duration(60))


问题


面经


文章

微信
公众号

扫码关注公众号