def __init__(self):
self.rospack = RosPack()
with open(join(self.rospack.get_path('apex_playground'), 'config', 'ergo.json')) as f:
self.params = json.load(f)
self.button = Button(self.params)
self.rate = rospy.Rate(self.params['publish_rate'])
self.button.switch_led(False)
# Service callers
self.robot_reach_srv_name = '{}/reach'.format(self.params['robot_name'])
self.robot_compliant_srv_name = '{}/set_compliant'.format(self.params['robot_name'])
rospy.loginfo("Ergo node is waiting for poppy controllers...")
rospy.wait_for_service(self.robot_reach_srv_name)
rospy.wait_for_service(self.robot_compliant_srv_name)
self.reach_proxy = rospy.ServiceProxy(self.robot_reach_srv_name, ReachTarget)
self.compliant_proxy = rospy.ServiceProxy(self.robot_compliant_srv_name, SetCompliant)
rospy.loginfo("Controllers connected!")
self.state_pub = rospy.Publisher('ergo/state', CircularState, queue_size=1)
self.button_pub = rospy.Publisher('sensors/buttons/help', Bool, queue_size=1)
self.goals = []
self.goal = 0.
self.joy_x = 0.
self.joy_y = 0.
self.motion_started_joy = 0.
self.js = JointState()
rospy.Subscriber('sensors/joystick/{}'.format(self.params["control_joystick_id"]), Joy, self.cb_joy)
rospy.Subscriber('{}/joint_state'.format(self.params['robot_name']), JointState, self.cb_js)
rospy.Subscriber('sensors/button_leds/pause', Bool, self.cb_bt_led)
self.t = rospy.Time.now()
self.srv_reset = None
self.extended = False
self.standby = False
self.last_activity = rospy.Time.now()
self.delta_t = rospy.Time.now()
python类JointState()的实例源码
def reach(self, target, duration):
js = JointState()
js.name = target.keys()
js.position = target.values()
self.reach_proxy(ReachTargetRequest(target=js,
duration=rospy.Duration(duration)))
def ros_to_list(joints):
assert isinstance(joints, JointState)
return joints.position
def list_to_ros(joints):
return JointState(position=list(joints))
def reach(self, positions, duration):
self.reach_proxy(ReachTargetRequest(target=JointState(name=positions.keys(),
position=positions.values()),
duration=rospy.Duration(duration)))
def main():
rospy.init_node('issue_com')
pub = rospy.Publisher(TRIAL_COM_TOPIC, TrialCommand, queue_size=10)
test_pub = rospy.Publisher(TEST_TOPIC, Empty, queue_size=10)
sub = rospy.Subscriber(POS_COM_TOPIC, TrialCommand, listen)
sub2 = rospy.Subscriber(RESULT_TOPIC, SampleResult, listen_report)
#sub = rospy.Subscriber('/joint_states', JointState, listen)
tc = TrialCommand()
T = 1
tc.controller = get_lin_gauss_test(T=T)
tc.T = T
tc.frequency = 20.0
# NOTE: ordering of datatypes in state is determined by the order here
tc.state_datatypes = [JOINT_ANGLES, JOINT_VELOCITIES]
tc.obs_datatypes = tc.state_datatypes
tc.ee_points = EE_SITES.reshape(EE_SITES.size).tolist()
r = rospy.Rate(1)
#while not rospy.is_shutdown():
# pub.publish(pc)
# r.sleep()
# print 'published!'
r.sleep()
test_pub.publish(Empty())
pub.publish(tc)
rospy.spin()
def main():
def listener():
rospy.init_node('listener', anonymous=True) # defines anonymous listener node
rospy.Subscriber('joint_states',JointState,callback)
rospy.spin() # spin() simply keeps python from exiting until this node is stopped
def callback(msg): # callback is executed when a message is published to 'joint_states'
pos = [1,1,1,1,1,1,1,1,1,1] # creates an array of 10 to store joint_states
for i in range (0,10): # for all servos ...
pos[i]=msg.position[i+14] #stores joint states to pos[] while bypassing the initial 14 passive revolute joints
pos[i]=converter(pos[i]) #converts servo posisitons into commands for the AR10 hand
print int(pos[i])
hand.move(0,int(pos[0])) #sends commands to the AR10
hand.move(1,int(pos[1]))
hand.move(2,int(pos[2]))
hand.move(3,int(pos[3]))
hand.move(4,int(pos[4]))
hand.move(5,int(pos[5]))
hand.move(6,int(pos[6]))
hand.move(7,int(pos[7]))
hand.move(8,int(pos[8]))
hand.move(9,int(pos[9]))
return pos
def converter(pos): #Converter that is executed when called in callback
command_value = (((pos*-3500)/0.02)+8000)
return command_value
hand = ar10() # create hand object
listener() # start subscriber
def __init__(self):
self.joint_state = JointState()
self.arm_map = dict()
self.joint_pub = rospy.Publisher("/as_arm/set_joints_states", JointState, queue_size=1)
self.pose_sub = rospy.Subscriber("/as_arm/joint_states", JointState, callback=self.callback_joint, queue_size=1)
# end effector service
self.check_collision_client = rospy.ServiceProxy('/check_collision', CheckCollisionValid)
self.tf_listener = tf.TransformListener()
def __init__(self):
self.joint_names = ['base_joint', 'shoulder_joint', 'elbow_joint', 'wrist_flex_joint', 'wrist_rot_joint',
'finger_joint1', 'finger_joint2']
# subscriber for setting joints position
self.joint_state = JointState()
self.states_pub = rospy.Publisher("/joint_states", JointState, queue_size=1)
print "======joint range:"
for name in self.joint_names[:5]:
v = self.JointMap[name]
v['range'][0] *= DEG_TO_RAD
v['range'][1] *= DEG_TO_RAD
print '\t', name, v['range']
def __init__(self):
self.joint_names = ['base_joint', 'shoulder_joint', 'elbow_joint', 'wrist_flex_joint', 'wrist_rot_joint',
]
# subscriber for setting joints position
self.joint_state = JointState()
self.states_pub = rospy.Publisher("/as_arm/set_joints_states", JointState, queue_size=1)
print "======joint range:"
for name in self.joint_names[:5]:
v = self.JointMap[name]
v['range'][0] *= DEG_TO_RAD
v['range'][1] *= DEG_TO_RAD
print '\t', name, v['range']
def __init__(self):
self.joint_names = ['base_joint', 'shoulder_joint', 'elbow_joint', 'wrist_flex_joint', 'wrist_rot_joint',
'finger_joint1', 'finger_joint2']
# subscriber for setting joints position
self.states_sub = rospy.Subscriber("/as_arm/set_joints_states", JointState, self.callback,
queue_size=2)
# a list of publisher
self.joint_pub = dict()
self.joint_pose = dict()
for idx, name in enumerate(self.joint_names):
pub = rospy.Publisher("/as_arm/joint%d_position_controller/command" % (idx + 1), Float64, queue_size=3)
self.joint_pub[name] = pub
self.joint_pose[name] = Float64()
def __init__(self):
self.joint_names = ['base_joint', 'shoulder_joint', 'elbow_joint', 'wrist_flex_joint', 'wrist_rot_joint',
]
# subscriber for setting joints position
self.joint_state = JointState()
self.states_pub = rospy.Publisher("/joint_states", JointState, queue_size=1)
self.pose_sub = rospy.Subscriber("/as_arm/joint_states", JointState, callback=self.callback_joint, queue_size=1)
print "======joint range:"
for name in self.joint_names[:5]:
v = self.JointMap[name]
v['range'][0] *= DEG_TO_RAD
v['range'][1] *= DEG_TO_RAD
print '\t', name, v['range']
def __init__(self):
self.odom = Odometry()
self.odom_received = False
self.odom_sub = rospy.Subscriber('state', Odometry, self.OdomCallBack)
self.thruster_received = False
self.thruster_sub = rospy.Subscriber('thruster_command', JointState, self.ThrusterCallBack)
def __init__(self, description='human_description', prefix='human', control=False):
self.description = description
self.robot_commander = RobotCommander(description)
if control:
self.joint_publisher = rospy.Publisher('/human/set_joint_values', JointState, queue_size=1)
self.groups = {}
self.groups['head'] = MoveGroupCommander('Head', description)
self.groups['right_arm'] = MoveGroupCommander('RightArm', description)
self.groups['left_arm'] = MoveGroupCommander('LeftArm', description)
self.groups['right_leg'] = MoveGroupCommander('RightLeg', description)
self.groups['left_leg'] = MoveGroupCommander('LeftLeg', description)
self.groups['upper_body'] = MoveGroupCommander('UpperBody', description)
self.groups['lower_body'] = MoveGroupCommander('LowerBody', description)
self.groups['whole_body'] = MoveGroupCommander('WholeBody', description)
# initialize end-effectors dict
self.end_effectors = {}
# fill both dict
for key, value in self.groups.iteritems():
self.end_effectors[key] = value.get_end_effector_link()
# add the list of end-effectors for bodies
self.end_effectors['upper_body'] = [self.end_effectors['head'],
self.end_effectors['right_arm'],
self.end_effectors['left_arm']]
self.end_effectors['lower_body'] = [self.end_effectors['right_leg'],
self.end_effectors['left_leg']]
self.end_effectors['whole_body'] = self.end_effectors['upper_body'] + self.end_effectors['lower_body']
self.prefix = prefix
self.urdf_reader = URDFReader()
rospy.wait_for_service('compute_fk')
self.compute_fk = rospy.ServiceProxy('compute_fk', GetPositionFK)
self.current_state = self.get_initial_state()
def inverse_kinematic(self, desired_poses, fixed_joints={}, tolerance=0.001, group_names='whole_body', seed=None):
def compute_ik_client():
rospy.wait_for_service('compute_human_ik')
try:
compute_ik = rospy.ServiceProxy('compute_human_ik', GetHumanIK)
res = compute_ik(poses, fixed_joint_state, tolerance, group_names, seed)
return res.joint_state
except rospy.ServiceException, e:
print "Service call failed: %s" % e
# in case of failure return T pose
return self.get_initial_state()
if seed is None:
seed = self.get_current_state()
if group_names is not list:
group_names = [group_names]
# convert the desired poses to PoseStamped
poses = []
for key, value in desired_poses.iteritems():
pose = transformations.list_to_pose(value)
pose.header.frame_id = key
poses.append(pose)
# convert the fixed joints to joint state
fixed_joint_state = JointState()
for key, value in fixed_joints.iteritems():
fixed_joint_state.name += [key]
fixed_joint_state.position += [value]
return compute_ik_client()
def jacobian(self, group_name, joint_state, use_quaternion=False, link=None, ref_point=None):
def compute_jacobian_srv():
rospy.wait_for_service('compute_jacobian')
try:
compute_jac = rospy.ServiceProxy('compute_jacobian', GetJacobian)
js = JointState()
js.name = self.get_joint_names(group_name)
js.position = self.extract_group_joints(group_name, joint_state)
p = Point(x=ref_point[0], y=ref_point[1], z=ref_point[2])
# call the service
res = compute_jac(group_name, link, js, p, use_quaternion)
# reorganize the jacobian
jac_array = np.array(res.jacobian).reshape((res.nb_rows, res.nb_cols))
# reorder the jacobian wrt to the joint state
ordered_jac = np.zeros((len(jac_array), len(joint_state.name)))
for i, n in enumerate(js.name):
ordered_jac[:, joint_state.name.index(n)] = jac_array[:, i]
return ordered_jac
except rospy.ServiceException, e:
print "Service call failed: %s" % e
# compute the jacobian only for chains
# if group_name not in ['right_arm', 'left_arm', 'head', 'right_leg', 'left_leg']:
# rospy.logerr('The Jacobian matrix can only be computed on kinematic chains')
# return []
# assign values
if link is None:
link = self.end_effectors[group_name]
if ref_point is None:
ref_point = [0, 0, 0]
# return the jacobian
return compute_jacobian_srv()
def main():
rospy.init_node('issue_com')
pub = rospy.Publisher(TRIAL_COM_TOPIC, TrialCommand, queue_size=10)
test_pub = rospy.Publisher(TEST_TOPIC, Empty, queue_size=10)
sub = rospy.Subscriber(POS_COM_TOPIC, TrialCommand, listen)
sub2 = rospy.Subscriber(RESULT_TOPIC, SampleResult, listen_report)
#sub = rospy.Subscriber('/joint_states', JointState, listen)
tc = TrialCommand()
T = 1
tc.controller = get_lin_gauss_test(T=T)
tc.T = T
tc.frequency = 20.0
# NOTE: ordering of datatypes in state is determined by the order here
tc.state_datatypes = [JOINT_ANGLES, JOINT_VELOCITIES]
tc.obs_datatypes = tc.state_datatypes
tc.ee_points = EE_SITES.reshape(EE_SITES.size).tolist()
r = rospy.Rate(1)
#while not rospy.is_shutdown():
# pub.publish(pc)
# r.sleep()
# print 'published!'
r.sleep()
test_pub.publish(Empty())
pub.publish(tc)
rospy.spin()
def __init__(self):
self.joint_name = rospy.get_param("~joint_name")
self.joint_state = JointState()
self.joint_state.name.append(self.joint_name)
self.joint_state.position.append(0.0)
self.joint_state.velocity.append(0.0)
self.joint_pub = rospy.Publisher("joint_states", JointState, queue_size=1)
self.command_sub = rospy.Subscriber("command", Float64,
self.command_callback, queue_size=1)
def getDictFromAngles(angles):
"""
Converts angles of a JointState() to a dictionary
:param angles (sensor_msgs.msg.JointState): angles to be converted
:return (dict): dictionary with angles
"""
d=dict(zip(angles.name,angles.position))
return collections.OrderedDict(sorted( d.items() ))
def getStrFromAngles(angles):
"""
Converts all angles of a JointState() to a printable string
:param angles (sensor_msgs.msg.JointState): JointState() angles to be converted
:return (string): string of the angles
"""
d=getDictFromAngles(angles)
return str( dict(d.items()))