def ik_quaternion(self, quaternion_pose):
"""Take a xyz qxqyqzqw stamped pose and convert it to joint angles
using IK. You can call self.limb.move_to_joint_positions to
move to those angles
"""
node = ("ExternalTools/{}/PositionKinematicsNode/"
"IKService".format(self.limb_name))
ik_service = rospy.ServiceProxy(node, SolvePositionIK)
ik_request = SolvePositionIKRequest()
ik_request.pose_stamp.append(quaternion_pose)
try:
rospy.loginfo('ik: waiting for IK service...')
rospy.wait_for_service(node, 5.0)
ik_response = ik_service(ik_request)
except (rospy.ServiceException, rospy.ROSException) as err:
rospy.logerr("ik_move: service request failed: %r" % err)
else:
if ik_response.isValid[0]:
rospy.loginfo("ik_move: valid joint configuration found")
# convert response to joint position control dictionary
limb_joints = dict(zip(ik_response.joints[0].name,
ik_response.joints[0].position))
return limb_joints
else:
rospy.logerr('ik_move: no valid configuration found')
return None
python类ServiceException()的实例源码
demo_vision_poseest_pickplace.py 文件源码
项目:cu-perception-manipulation-stack
作者: correlllab
项目源码
文件源码
阅读 15
收藏 0
点赞 0
评论 0
demo_graspsuccessExp.py 文件源码
项目:cu-perception-manipulation-stack
作者: correlllab
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def ik_quaternion(self, quaternion_pose):
"""Take a xyz qxqyqzqw stamped pose and convert it to joint angles
using IK. You can call self.limb.move_to_joint_positions to
move to those angles
"""
node = ("ExternalTools/{}/PositionKinematicsNode/"
"IKService".format(self.limb_name))
ik_service = rospy.ServiceProxy(node, SolvePositionIK)
ik_request = SolvePositionIKRequest()
ik_request.pose_stamp.append(quaternion_pose)
try:
rospy.loginfo('ik: waiting for IK service...')
rospy.wait_for_service(node, 5.0)
ik_response = ik_service(ik_request)
except (rospy.ServiceException, rospy.ROSException) as err:
rospy.logerr("ik_move: service request failed: %r" % err)
else:
if ik_response.isValid[0]:
rospy.loginfo("ik_move: valid joint configuration found")
# convert response to joint position control dictionary
limb_joints = dict(zip(ik_response.joints[0].name,
ik_response.joints[0].position))
return limb_joints
else:
rospy.logerr('ik_move: no valid configuration found')
return None
def home(self):
addr = '/{}_driver/in/home_arm'.format(self.robot_type)
rospy.wait_for_service(addr)
try:
serv = rospy.ServiceProxy(addr, HomeArm)
rep = serv()
rospy.loginfo(rep)
except rospy.ServiceException as e:
rospy.loginfo("Service error {}".format(e))
def update_focus(self):
""" Updates focused interest."""
try:
interest_id = request.form['interestId']
self.services.set_focus(interest_id)
except ServiceException as e:
rospy.logerr("Cannot set focus. " + repr(e))
return '', 204
def time_travel(self):
""" Revert experiment state to a given point."""
try:
point = request.form['point']
self.services.set_iteration(point)
except ServiceException as e:
rospy.logerr("Cannot set iteration. " + repr(e))
return '', 204
def update_assessment(self):
""" Updates assessment. """
try:
assessment_id = request.form['assessmentId']
self.services.assess(assessment_id)
except ServiceException as e:
rospy.logerr("Cannot set assessment. " + repr(e))
return '', 204
def baxter_ik_move(self, rpy_pose):
# quaternion_pose = conversions.list_to_pose_stamped(rpy_pose, "base")
quaternion_pose = convert_pose_to_msg(rpy_pose, "base")
node = "ExternalTools/" + self.limb + "/PositionKinematicsNode/IKService"
ik_service = rospy.ServiceProxy(node, SolvePositionIK)
ik_request = SolvePositionIKRequest()
hdr = Header(stamp=rospy.Time.now(), frame_id="base")
ik_request.pose_stamp.append(quaternion_pose)
try:
rospy.wait_for_service(node, 5.0)
ik_response = ik_service(ik_request)
except (rospy.ServiceException, rospy.ROSException), error_message:
rospy.logerr("Service request failed: %r" % (error_message,))
print "ERROR - baxter_ik_move - Failed to append pose"
return 1
if (ik_response.isValid[0]):
# convert response to joint position control dictionary
limb_joints = dict(zip(ik_response.joints[0].name, ik_response.joints[0].position))
# move limb
self.limb_interface.move_to_joint_positions(limb_joints)
else:
print "ERROR - baxter_ik_move - No valid joint configuration found"
return 2
self.getPose() #Store current pose into self.pose
print "Move Executed"
return -1
#Because we can't get full moveit_commander on the raspberry pi due to memory limitations, rewritten implementation of the required conversion function
def forward_kinematic(self, joint_state, base='base', links=None):
def compute_fk_client():
try:
header = Header()
header.stamp = rospy.Time.now()
header.frame_id = self.prefix + '/base'
rs = RobotState()
rs.joint_state = joint_state
res = self.compute_fk(header, links, rs)
return res.pose_stamped
except rospy.ServiceException, e:
print "Service call failed: %s" % e
# in case of troubles return 0 pose stamped
return []
if links is None:
links = self.end_effectors['whole_body']
if type(links) is not list:
if links == "all":
links = self.get_link_names('whole_body')
else:
links = [links]
# check that the base is in links
if base != 'base' and base not in links:
links.append(base)
pose_stamped_list = compute_fk_client()
if not pose_stamped_list:
return {}
# transform it in a dict of poses
pose_dict = {}
if base != 'base':
tr_base = transformations.pose_to_list(pose_stamped_list[links.index(base)].pose)
inv_base = transformations.inverse_transform(tr_base)
for i in range(len(links)):
if links[i] != base:
tr = transformations.pose_to_list(pose_stamped_list[i].pose)
pose_dict[links[i]] = transformations.multiply_transform(inv_base, tr)
else:
for i in range(len(links)):
pose_dict[links[i]] = transformations.pose_to_list(pose_stamped_list[i].pose)
return pose_dict
def inverse_kinematic(self, desired_poses, fixed_joints={}, tolerance=0.001, group_names='whole_body', seed=None):
def compute_ik_client():
rospy.wait_for_service('compute_human_ik')
try:
compute_ik = rospy.ServiceProxy('compute_human_ik', GetHumanIK)
res = compute_ik(poses, fixed_joint_state, tolerance, group_names, seed)
return res.joint_state
except rospy.ServiceException, e:
print "Service call failed: %s" % e
# in case of failure return T pose
return self.get_initial_state()
if seed is None:
seed = self.get_current_state()
if group_names is not list:
group_names = [group_names]
# convert the desired poses to PoseStamped
poses = []
for key, value in desired_poses.iteritems():
pose = transformations.list_to_pose(value)
pose.header.frame_id = key
poses.append(pose)
# convert the fixed joints to joint state
fixed_joint_state = JointState()
for key, value in fixed_joints.iteritems():
fixed_joint_state.name += [key]
fixed_joint_state.position += [value]
return compute_ik_client()
def jacobian(self, group_name, joint_state, use_quaternion=False, link=None, ref_point=None):
def compute_jacobian_srv():
rospy.wait_for_service('compute_jacobian')
try:
compute_jac = rospy.ServiceProxy('compute_jacobian', GetJacobian)
js = JointState()
js.name = self.get_joint_names(group_name)
js.position = self.extract_group_joints(group_name, joint_state)
p = Point(x=ref_point[0], y=ref_point[1], z=ref_point[2])
# call the service
res = compute_jac(group_name, link, js, p, use_quaternion)
# reorganize the jacobian
jac_array = np.array(res.jacobian).reshape((res.nb_rows, res.nb_cols))
# reorder the jacobian wrt to the joint state
ordered_jac = np.zeros((len(jac_array), len(joint_state.name)))
for i, n in enumerate(js.name):
ordered_jac[:, joint_state.name.index(n)] = jac_array[:, i]
return ordered_jac
except rospy.ServiceException, e:
print "Service call failed: %s" % e
# compute the jacobian only for chains
# if group_name not in ['right_arm', 'left_arm', 'head', 'right_leg', 'left_leg']:
# rospy.logerr('The Jacobian matrix can only be computed on kinematic chains')
# return []
# assign values
if link is None:
link = self.end_effectors[group_name]
if ref_point is None:
ref_point = [0, 0, 0]
# return the jacobian
return compute_jacobian_srv()
def delete_gazebo_models():
# This will be called on ROS Exit, deleting Gazebo models
try:
delete_model = rospy.ServiceProxy('/gazebo/delete_model', DeleteModel)
resp_delete = delete_model("cafe_table")
resp_delete = delete_model("object")
except rospy.ServiceException, e:
rospy.loginfo("Delete Model service call failed: {0}".format(e))
def switch_motors(onoff):
rospy.wait_for_service('/switch_motors')
try:
p = rospy.ServiceProxy('/switch_motors', SwitchMotors)
res = p(onoff)
return res.accepted
except rospy.ServiceException, e:
print "Service call failed: %s"%e
else:
return False
def pos_control(left_hz,right_hz,time_ms):
rospy.wait_for_service('/put_motor_freqs')
try:
p = rospy.ServiceProxy('/put_motor_freqs', PutMotorFreqs)
res = p(left_hz,right_hz,time_ms)
return res.accepted
except rospy.ServiceException, e:
print "Service call failed: %s"%e
else:
return False
def get_ik_joints(target_x, target_y, initial_z, target_z, target_theta, n_steps):
ns = "ExternalTools/left/PositionKinematicsNode/IKService"
iksvc = rospy.ServiceProxy(ns, SolvePositionIK)
ikreq = SolvePositionIKRequest(seed_mode=SolvePositionIKRequest.SEED_CURRENT)
hdr = Header(stamp=rospy.Time.now(), frame_id='base')
qx = np.sin(target_theta * 0.5)
qy = np.cos(target_theta * 0.5)
for z in np.arange(initial_z, target_z, (target_z-initial_z)/n_steps):
pose = PoseStamped(
header=hdr,
pose=Pose(
position=Point( x=target_x, y=target_y, z=z, ),
orientation=Quaternion( x=qx, y=qy, z=0., w=0., ),
),
)
ikreq.pose_stamp.append(pose)
try:
rospy.wait_for_service(ns, 5.0)
resp = iksvc(ikreq)
except (rospy.ServiceException, rospy.ROSException), e:
rospy.logerr("Service call failed: %s" % (e,))
return 1
return [j for v, j in zip(resp.isValid, resp.joints) if v]
def getAnglesFromPose(self,pose):
if type(pose)==Pose:
goal=PoseStamped()
goal.header.frame_id="/base"
goal.pose=pose
else:
goal=pose
if not self.ik:
rospy.logerror("ERROR: Inverse Kinematics service was not loaded")
return None
goalstr="%f,%f,%f"%(goal.pose.position.x,goal.pose.position.y,goal.pose.position.z)
ikreq = SolvePositionIKRequest()
ikreq.pose_stamp.append(goal)
try:
resp = self.iksvc(ikreq)
if (resp.isValid[0]):
return resp.joints[0]
else:
rospy.logerr("FAILURE - No Valid Joint Solution Found for %s"%goalstr)
return None
except rospy.ServiceException,e :
rospy.loginfo("Service call failed: %s" % (e,))
return None
def joystickUpdated(data):
rospy.logfatal("Emergency stop butoon pressed!")
if data.buttons[0] == 1:
for e in emergencies:
try:
e()
except ServiceException:
pass
def set_mode(self, mode):
if not self.current_state.connected:
print "No FCU connection"
elif self.current_state.mode == mode:
print "Already in " + mode + " mode"
else:
# wait for service
rospy.wait_for_service("mavros/set_mode")
# service client
set_mode = rospy.ServiceProxy("mavros/set_mode", SetMode)
# set request object
req = SetModeRequest()
req.custom_mode = mode
# zero time
t0 = rospy.get_time()
# check response
while not rospy.is_shutdown() and (self.current_state.mode != req.custom_mode):
if rospy.get_time() - t0 > 2.0: # check every 5 seconds
try:
# request
set_mode.call(req)
except rospy.ServiceException, e:
print "Service did not process request: %s"%str(e)
t0 = rospy.get_time()
print "Mode: "+self.current_state.mode + " established"
def go_to_pose(lmb, pose_msg, timeout=15.0):
"""
Given a limb (right or left) and a desired pose as a stamped message,
run inverse kinematics to attempt to find a joint configuration to yield
the pose and then move limb to the configuration.
After 5 seconds of attempts to solve the IK, raise an exception.
"""
node = "ExternalTools/" + lmb + "/PositionKinematicsNode/IKService"
ik_service = rospy.ServiceProxy(node, SolvePositionIK)
ik_request = SolvePositionIKRequest()
ik_request.pose_stamp.append(pose_msg)
# Allow 5 seconds to find an IK solution
try:
rospy.wait_for_service(node, 5.0)
ik_response = ik_service(ik_request)
except (rospy.ServiceException, rospy.ROSException), e:
rospy.logerr("Service call failed: %s" % (e,))
return 1
# Check if result valid
if ik_response.isValid[0]:
# convert response to joint position control dictionary
limb_joints = dict(zip(ik_response.joints[0].name,
ik_response.joints[0].position))
# send limb to joint positions
baxter_interface.limb.Limb(lmb).move_to_joint_positions(
limb_joints, timeout=timeout)
else:
raise Exception("Failed to find valid joint configuration.")
def __handler(self, server, service_name, proxy, req):
time_start = timer()
client = req._connection_header['callerid']
# generate a JSON-encodable description of the parameters for this request
# TODO: will fail with complex, embedded objects
params = {p: getattr(req, p) for p in req.__slots__}
# send the request and wait for a response
success = False
try:
ret = proxy(req)
success = True
response = {p: getattr(ret, p) for p in ret.__slots__}
except rospy.ServiceException, e:
success = False
response = {'reason': e}
# log the service call
finally:
time_end = timer()
time_duration = time_end - time_start
log = {
'service': service_name,
'server': server,
'client': client,
'time_start': time_start,
'time_end': time_end,
'time_duration': time_duration,
'params': params,
'response': response,
'success': success
}
serviceCallPublisher.publish(json.dumps(log))
return ret
def delete(self):
if not self.spawned: return
try:
rospy.loginfo("Deleting "+self.gazebo_name)
GazeboObject.delete_model_srv.wait_for_service()
resp_delete = GazeboObject.delete_model_srv(self.gazebo_name)
self.spawned = False
except rospy.ServiceException, e:
pass # Don't know why, but an exception is raised by ROS whereas the deletion is actually successful ... So ignore the exception