python类JointState()的实例源码

ergo.py 文件源码 项目:APEX 作者: ymollard 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __init__(self):
        self.rospack = RosPack()
        with open(join(self.rospack.get_path('apex_playground'), 'config', 'ergo.json')) as f:
            self.params = json.load(f)
        self.button = Button(self.params)
        self.rate = rospy.Rate(self.params['publish_rate'])
        self.button.switch_led(False)

        # Service callers
        self.robot_reach_srv_name = '{}/reach'.format(self.params['robot_name'])
        self.robot_compliant_srv_name = '{}/set_compliant'.format(self.params['robot_name'])
        rospy.loginfo("Ergo node is waiting for poppy controllers...")
        rospy.wait_for_service(self.robot_reach_srv_name)
        rospy.wait_for_service(self.robot_compliant_srv_name)
        self.reach_proxy = rospy.ServiceProxy(self.robot_reach_srv_name, ReachTarget)
        self.compliant_proxy = rospy.ServiceProxy(self.robot_compliant_srv_name, SetCompliant)
        rospy.loginfo("Controllers connected!")

        self.state_pub = rospy.Publisher('ergo/state', CircularState, queue_size=1)
        self.button_pub = rospy.Publisher('sensors/buttons/help', Bool, queue_size=1)

        self.goals = []
        self.goal = 0.
        self.joy_x = 0.
        self.joy_y = 0.
        self.motion_started_joy = 0.
        self.js = JointState()
        rospy.Subscriber('sensors/joystick/{}'.format(self.params["control_joystick_id"]), Joy, self.cb_joy)
        rospy.Subscriber('{}/joint_state'.format(self.params['robot_name']), JointState, self.cb_js)
        rospy.Subscriber('sensors/button_leds/pause', Bool, self.cb_bt_led)

        self.t = rospy.Time.now()
        self.srv_reset = None
        self.extended = False
        self.standby = False
        self.last_activity = rospy.Time.now()
        self.delta_t = rospy.Time.now()
ergo.py 文件源码 项目:APEX 作者: ymollard 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def reach(self, target, duration):
        js = JointState()
        js.name = target.keys()
        js.position = target.values()
        self.reach_proxy(ReachTargetRequest(target=js,
                                            duration=rospy.Duration(duration)))
joints.py 文件源码 项目:APEX 作者: ymollard 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def ros_to_list(joints):
    assert isinstance(joints, JointState)
    return joints.position
joints.py 文件源码 项目:APEX 作者: ymollard 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def list_to_ros(joints):
    return JointState(position=list(joints))
services.py 文件源码 项目:APEX 作者: ymollard 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def reach(self, positions, duration):
        self.reach_proxy(ReachTargetRequest(target=JointState(name=positions.keys(),
                                                              position=positions.values()),
                                            duration=rospy.Duration(duration)))
check_jac.py 文件源码 项目:lsdc 作者: febert 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def main():
    rospy.init_node('issue_com')
    pub = rospy.Publisher(TRIAL_COM_TOPIC, TrialCommand, queue_size=10)
    test_pub = rospy.Publisher(TEST_TOPIC, Empty, queue_size=10)
    sub = rospy.Subscriber(POS_COM_TOPIC, TrialCommand, listen)
    sub2 = rospy.Subscriber(RESULT_TOPIC, SampleResult, listen_report)
    #sub = rospy.Subscriber('/joint_states', JointState, listen)

    tc = TrialCommand()
    T = 1
    tc.controller = get_lin_gauss_test(T=T)
    tc.T = T
    tc.frequency = 20.0
    # NOTE: ordering of datatypes in state is determined by the order here
    tc.state_datatypes = [JOINT_ANGLES, JOINT_VELOCITIES]
    tc.obs_datatypes = tc.state_datatypes
    tc.ee_points = EE_SITES.reshape(EE_SITES.size).tolist()

    r = rospy.Rate(1)
    #while not rospy.is_shutdown():
    #    pub.publish(pc)
    #    r.sleep()
    #    print 'published!'
    r.sleep()
    test_pub.publish(Empty())
    pub.publish(tc)
    rospy.spin()
old_ar10_rviz_control_node.py 文件源码 项目:AR10 作者: Active8Robots 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def main():

    def listener():


        rospy.init_node('listener', anonymous=True) # defines anonymous listener node
        rospy.Subscriber('joint_states',JointState,callback)
        rospy.spin()  # spin() simply keeps python from exiting until this node is stopped

    def callback(msg): # callback is executed when a message is published to 'joint_states'
        pos = [1,1,1,1,1,1,1,1,1,1] # creates an array of 10 to store joint_states
        for i in range (0,10):      # for all servos ...
          pos[i]=msg.position[i+14]  #stores joint states to pos[] while bypassing the initial 14 passive revolute joints
          pos[i]=converter(pos[i])   #converts servo posisitons into commands for the AR10 hand
          print int(pos[i])
        hand.move(0,int(pos[0]))     #sends commands to the AR10
        hand.move(1,int(pos[1]))
        hand.move(2,int(pos[2]))
        hand.move(3,int(pos[3]))
        hand.move(4,int(pos[4]))
            hand.move(5,int(pos[5]))
            hand.move(6,int(pos[6]))
            hand.move(7,int(pos[7]))
            hand.move(8,int(pos[8]))
            hand.move(9,int(pos[9]))
        return pos


    def converter(pos):              #Converter that is executed when called in callback
        command_value = (((pos*-3500)/0.02)+8000)
        return command_value


    hand = ar10() # create hand object
    listener() # start subscriber
simulate_state.py 文件源码 项目:AS_6Dof_Arm 作者: yao62995 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(self):
        self.joint_state = JointState()
        self.arm_map = dict()
        self.joint_pub = rospy.Publisher("/as_arm/set_joints_states", JointState, queue_size=1)
        self.pose_sub = rospy.Subscriber("/as_arm/joint_states", JointState, callback=self.callback_joint, queue_size=1)
        # end effector service
        self.check_collision_client = rospy.ServiceProxy('/check_collision', CheckCollisionValid)
        self.tf_listener = tf.TransformListener()
test_rviz_arm_joint.py 文件源码 项目:AS_6Dof_Arm 作者: yao62995 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __init__(self):
        self.joint_names = ['base_joint', 'shoulder_joint', 'elbow_joint', 'wrist_flex_joint', 'wrist_rot_joint',
                            'finger_joint1', 'finger_joint2']
        # subscriber for setting joints position
        self.joint_state = JointState()
        self.states_pub = rospy.Publisher("/joint_states", JointState, queue_size=1)

        print "======joint range:"
        for name in self.joint_names[:5]:
            v = self.JointMap[name]
            v['range'][0] *= DEG_TO_RAD
            v['range'][1] *= DEG_TO_RAD
            print '\t', name, v['range']
test_gazebo_arm_joint.py 文件源码 项目:AS_6Dof_Arm 作者: yao62995 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __init__(self):
        self.joint_names = ['base_joint', 'shoulder_joint', 'elbow_joint', 'wrist_flex_joint', 'wrist_rot_joint',
                            ]
        # subscriber for setting joints position
        self.joint_state = JointState()
        self.states_pub = rospy.Publisher("/as_arm/set_joints_states", JointState, queue_size=1)

        print "======joint range:"
        for name in self.joint_names[:5]:
            v = self.JointMap[name]
            v['range'][0] *= DEG_TO_RAD
            v['range'][1] *= DEG_TO_RAD
            print '\t', name, v['range']
arm_joint_pose.py 文件源码 项目:AS_6Dof_Arm 作者: yao62995 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __init__(self):
        self.joint_names = ['base_joint', 'shoulder_joint', 'elbow_joint', 'wrist_flex_joint', 'wrist_rot_joint',
                            'finger_joint1', 'finger_joint2']
        # subscriber for setting joints position
        self.states_sub = rospy.Subscriber("/as_arm/set_joints_states", JointState, self.callback,
                                           queue_size=2)
        # a list of publisher
        self.joint_pub = dict()
        self.joint_pose = dict()
        for idx, name in enumerate(self.joint_names):
            pub = rospy.Publisher("/as_arm/joint%d_position_controller/command" % (idx + 1), Float64, queue_size=3)
            self.joint_pub[name] = pub
            self.joint_pose[name] = Float64()
arm_joint_sync.py 文件源码 项目:AS_6Dof_Arm 作者: yao62995 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __init__(self):
        self.joint_names = ['base_joint', 'shoulder_joint', 'elbow_joint', 'wrist_flex_joint', 'wrist_rot_joint',
                            ]
        # subscriber for setting joints position
        self.joint_state = JointState()
        self.states_pub = rospy.Publisher("/joint_states", JointState, queue_size=1)

        self.pose_sub = rospy.Subscriber("/as_arm/joint_states", JointState, callback=self.callback_joint, queue_size=1)
        print "======joint range:"
        for name in self.joint_names[:5]:
            v = self.JointMap[name]
            v['range'][0] *= DEG_TO_RAD
            v['range'][1] *= DEG_TO_RAD
            print '\t', name, v['range']
rviz_bridge.py 文件源码 项目:bluerov_ffg 作者: freefloating-gazebo 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __init__(self):

        self.odom = Odometry()
        self.odom_received = False
        self.odom_sub = rospy.Subscriber('state', Odometry, self.OdomCallBack) 

        self.thruster_received = False
        self.thruster_sub = rospy.Subscriber('thruster_command', JointState, self.ThrusterCallBack)
human_model.py 文件源码 项目:human_moveit_config 作者: baxter-flowers 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __init__(self, description='human_description', prefix='human', control=False):
        self.description = description
        self.robot_commander = RobotCommander(description)
        if control:
            self.joint_publisher = rospy.Publisher('/human/set_joint_values', JointState, queue_size=1)
        self.groups = {}
        self.groups['head'] = MoveGroupCommander('Head', description)
        self.groups['right_arm'] = MoveGroupCommander('RightArm', description)
        self.groups['left_arm'] = MoveGroupCommander('LeftArm', description)
        self.groups['right_leg'] = MoveGroupCommander('RightLeg', description)
        self.groups['left_leg'] = MoveGroupCommander('LeftLeg', description)
        self.groups['upper_body'] = MoveGroupCommander('UpperBody', description)
        self.groups['lower_body'] = MoveGroupCommander('LowerBody', description)
        self.groups['whole_body'] = MoveGroupCommander('WholeBody', description)
        # initialize end-effectors dict
        self.end_effectors = {}
        # fill both dict
        for key, value in self.groups.iteritems():
            self.end_effectors[key] = value.get_end_effector_link()
        # add the list of end-effectors for bodies
        self.end_effectors['upper_body'] = [self.end_effectors['head'],
                                            self.end_effectors['right_arm'],
                                            self.end_effectors['left_arm']]
        self.end_effectors['lower_body'] = [self.end_effectors['right_leg'],
                                            self.end_effectors['left_leg']]
        self.end_effectors['whole_body'] = self.end_effectors['upper_body'] + self.end_effectors['lower_body']
        self.prefix = prefix
        self.urdf_reader = URDFReader()

        rospy.wait_for_service('compute_fk')
        self.compute_fk = rospy.ServiceProxy('compute_fk', GetPositionFK)
        self.current_state = self.get_initial_state()
human_model.py 文件源码 项目:human_moveit_config 作者: baxter-flowers 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def inverse_kinematic(self, desired_poses, fixed_joints={}, tolerance=0.001, group_names='whole_body', seed=None):
        def compute_ik_client():
            rospy.wait_for_service('compute_human_ik')
            try:
                compute_ik = rospy.ServiceProxy('compute_human_ik', GetHumanIK)
                res = compute_ik(poses, fixed_joint_state, tolerance, group_names, seed)
                return res.joint_state
            except rospy.ServiceException, e:
                print "Service call failed: %s" % e
                # in case of failure return T pose
                return self.get_initial_state()
        if seed is None:
            seed = self.get_current_state()
        if group_names is not list:
            group_names = [group_names]
        # convert the desired poses to PoseStamped
        poses = []
        for key, value in desired_poses.iteritems():
            pose = transformations.list_to_pose(value)
            pose.header.frame_id = key
            poses.append(pose)
        # convert the fixed joints to joint state
        fixed_joint_state = JointState()
        for key, value in fixed_joints.iteritems():
            fixed_joint_state.name += [key]
            fixed_joint_state.position += [value]
        return compute_ik_client()
human_model.py 文件源码 项目:human_moveit_config 作者: baxter-flowers 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def jacobian(self, group_name, joint_state, use_quaternion=False, link=None, ref_point=None):
        def compute_jacobian_srv():
            rospy.wait_for_service('compute_jacobian')
            try:
                compute_jac = rospy.ServiceProxy('compute_jacobian', GetJacobian)
                js = JointState()
                js.name = self.get_joint_names(group_name)
                js.position = self.extract_group_joints(group_name, joint_state)
                p = Point(x=ref_point[0], y=ref_point[1], z=ref_point[2])
                # call the service
                res = compute_jac(group_name, link, js, p, use_quaternion)
                # reorganize the jacobian
                jac_array = np.array(res.jacobian).reshape((res.nb_rows, res.nb_cols))
                # reorder the jacobian wrt to the joint state
                ordered_jac = np.zeros((len(jac_array), len(joint_state.name)))
                for i, n in enumerate(js.name):
                    ordered_jac[:, joint_state.name.index(n)] = jac_array[:, i]
                return ordered_jac
            except rospy.ServiceException, e:
                print "Service call failed: %s" % e
        #  compute the jacobian only for chains
        # if group_name not in ['right_arm', 'left_arm', 'head', 'right_leg', 'left_leg']:
        #     rospy.logerr('The Jacobian matrix can only be computed on kinematic chains')
        #     return []
        # assign values
        if link is None:
            link = self.end_effectors[group_name]
        if ref_point is None:
            ref_point = [0, 0, 0]
        # return the jacobian
        return compute_jacobian_srv()
check_jac.py 文件源码 项目:gps_superball_public 作者: young-geng 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def main():
    rospy.init_node('issue_com')
    pub = rospy.Publisher(TRIAL_COM_TOPIC, TrialCommand, queue_size=10)
    test_pub = rospy.Publisher(TEST_TOPIC, Empty, queue_size=10)
    sub = rospy.Subscriber(POS_COM_TOPIC, TrialCommand, listen)
    sub2 = rospy.Subscriber(RESULT_TOPIC, SampleResult, listen_report)
    #sub = rospy.Subscriber('/joint_states', JointState, listen)

    tc = TrialCommand()
    T = 1
    tc.controller = get_lin_gauss_test(T=T)
    tc.T = T
    tc.frequency = 20.0
    # NOTE: ordering of datatypes in state is determined by the order here
    tc.state_datatypes = [JOINT_ANGLES, JOINT_VELOCITIES]
    tc.obs_datatypes = tc.state_datatypes
    tc.ee_points = EE_SITES.reshape(EE_SITES.size).tolist()

    r = rospy.Rate(1)
    #while not rospy.is_shutdown():
    #    pub.publish(pc)
    #    r.sleep()
    #    print 'published!'
    r.sleep()
    test_pub.publish(Empty())
    pub.publish(tc)
    rospy.spin()
command_to_joint_state.py 文件源码 项目:carbot 作者: lucasw 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self):
        self.joint_name = rospy.get_param("~joint_name")
        self.joint_state = JointState()
        self.joint_state.name.append(self.joint_name)
        self.joint_state.position.append(0.0)
        self.joint_state.velocity.append(0.0)
        self.joint_pub = rospy.Publisher("joint_states", JointState, queue_size=1)
        self.command_sub = rospy.Subscriber("command", Float64,
                                            self.command_callback, queue_size=1)
abstract_limb.py 文件源码 项目:baxter_gui 作者: HumaRobotics 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def getDictFromAngles(angles):
    """
        Converts angles of a JointState() to a dictionary
        :param angles (sensor_msgs.msg.JointState): angles to be converted
        :return (dict): dictionary with angles 
    """

    d=dict(zip(angles.name,angles.position))
    return collections.OrderedDict(sorted( d.items() ))
abstract_limb.py 文件源码 项目:baxter_gui 作者: HumaRobotics 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def getStrFromAngles(angles):
    """
        Converts all angles of a JointState() to a printable string
        :param angles (sensor_msgs.msg.JointState): JointState() angles to be converted
        :return (string): string of the angles
    """
    d=getDictFromAngles(angles)
    return str( dict(d.items()))


问题


面经


文章

微信
公众号

扫码关注公众号