def ik_solve(limb, pos, orient):
ns = "ExternalTools/" + limb + "/PositionKinematicsNode/IKService"
iksvc = rospy.ServiceProxy(ns, SolvePositionIK)
ikreq = SolvePositionIKRequest()
hdr = Header(stamp=rospy.Time.now(), frame_id='base')
poses = {
str(limb): PoseStamped(header=hdr, pose=Pose(position=pos, orientation=orient))}
ikreq.pose_stamp.append(poses[limb])
try:
rospy.wait_for_service(ns, 5.0)
resp = iksvc(ikreq)
except (rospy.ServiceException, rospy.ROSException), e:
rospy.logerr("Service call failed: %s" % (e,))
return 1
if resp.isValid[0]:
limb_joints = dict(zip(resp.joints[0].name, resp.joints[0].position))
return limb_joints
else:
return Errors.RaiseNotReachable(pos)
python类ServiceException()的实例源码
def save_to_memory(self, perception_name, data={}):
if data == {}:
rospy.logwarn('Empty data inserted...')
return
for item in data.keys():
if not item in self.conf_data[perception_name]['data'].keys():
rospy.logwarn('[%s -- wrong data inserted [%s]...'%(perception_name, item))
return
srv_req = WriteDataRequest()
srv_req.perception_name = perception_name
srv_req.data = json.dumps(data)
srv_req.by = rospy.get_name()
target_memory = self.conf_data[perception_name]['target_memory']
try:
rospy.wait_for_service('/%s/write_data'%target_memory)
self.dict_srv_wr[target_memory](srv_req)
except rospy.ServiceException as e:
pass
def compute_static_stability_thread():
rate = rospy.Rate(60)
global kron_com_vertices
while not rospy.is_shutdown():
if 'COM-static' in support_area_handles \
and not gui.show_com_support_area:
delete_support_area_display('COM-static')
if not motion_plan or not motion_plan.started \
or 'COM-static' in support_area_handles \
or not gui.show_com_support_area:
rate.sleep()
continue
color = (0.1, 0.1, 0.1, 0.5)
req = contact_stability.srv.StaticAreaRequest(
contacts=convert_contacts_to_ros(motion_plan.cur_stance.contacts),
mass=robot.mass, z_out=motion_plan.com_height)
try:
res = compute_com_area(req)
vertices = [array([v.x, v.y, v.z]) for v in res.vertices]
update_support_area_display('COM-static', vertices, [], color)
except rospy.ServiceException:
delete_support_area_display('COM-static')
rate.sleep()
def request(self, uavcan_msg, node_id, callback, priority=None, timeout=None):
#pylint: disable=W0613
uavcan_type = uavcan.get_uavcan_data_type(uavcan_msg)
if not uavcan_type.full_name in self.__service_proxies:
self.__service_proxies[uavcan_type.full_name] = Service(uavcan_type.full_name).ServiceProxy()
service_proxy = self.__service_proxies[uavcan_type.full_name]
ros_req = copy_uavcan_ros(service_proxy.request_class(), uavcan_msg, request=True)
setattr(ros_req, uavcan_id_field_name, node_id)
def service_proxy_call():
try:
return service_proxy(ros_req)
except rospy.ServiceException:
return None
def request_finished(fut):
ros_resp = fut.result()
if ros_resp is None:
uavcan_resp = None
else:
uavcan_resp = uavcan_node._Event(self, uavcan_type, ros_resp, uavcan_id=node_id, request=False)
callback(uavcan_resp)
self.__request_executor.submit(service_proxy_call).add_done_callback(request_finished)
def init_traj(self):
try:
# self.recorder.init_traj(itr)
if self.use_goalimage:
goal_img_main, goal_state = self.load_goalimage()
goal_img_aux1 = np.zeros([64, 64, 3])
else:
goal_img_main = np.zeros([64, 64, 3])
goal_img_aux1 = np.zeros([64, 64, 3])
goal_img_main = self.bridge.cv2_to_imgmsg(goal_img_main)
goal_img_aux1 = self.bridge.cv2_to_imgmsg(goal_img_aux1)
rospy.wait_for_service('init_traj_visualmpc', timeout=1)
self.init_traj_visual_func(0, 0, goal_img_main, goal_img_aux1, self.save_subdir)
except (rospy.ServiceException, rospy.ROSException), e:
rospy.logerr("Service call failed: %s" % (e,))
raise ValueError('get_kinectdata service failed')
def save(self, i_save, action, endeffector_pose):
self.t_savereq = rospy.get_time()
assert self.instance_type == 'main'
if self.use_aux:
# request save at auxiliary recorders
try:
rospy.wait_for_service('get_kinectdata', 0.1)
resp1 = self.save_kinectdata_func(i_save)
except (rospy.ServiceException, rospy.ROSException), e:
rospy.logerr("Service call failed: %s" % (e,))
raise ValueError('get_kinectdata service failed')
if self.save_images:
self._save_img_local(i_save)
if self.save_actions:
self._save_state_actions(i_save, action, endeffector_pose)
if self.save_gif:
highres = cv2.cvtColor(self.ltob.img_cv2, cv2.COLOR_BGR2RGB)
print 'highres dim',highres.shape
self.highres_imglist.append(highres)
gripperalignment.py 文件源码
项目:cu-perception-manipulation-stack
作者: correlllab
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def ik_quaternion(self, quaternion_pose):
node = ("ExternalTools/{}/PositionKinematicsNode/"
"IKService".format(self.limb_name))
ik_service = rospy.ServiceProxy(node, SolvePositionIK)
ik_request = SolvePositionIKRequest()
ik_request.pose_stamp.append(quaternion_pose)
try:
rospy.loginfo('ik: waiting for IK service...')
rospy.wait_for_service(node, 5.0)
ik_response = ik_service(ik_request)
except (rospy.ServiceException, rospy.ROSException) as err:
rospy.logerr("ik_move: service request failed: %r" % err)
else:
if ik_response.isValid[0]:
rospy.loginfo("ik_move: valid joint configuration found")
limb_joints = dict(zip(ik_response.joints[0].name,
ik_response.joints[0].position))
return limb_joints
else:
rospy.logerr('ik_move: no valid configuration found')
return None
def _arm(self):
print(self.namespace, 'arming')
service_name = '%s/mavros/cmd/arming' % self.namespace
rospy.wait_for_service(service_name)
try:
service = rospy.ServiceProxy(service_name, CommandBool)
resp = service(True)
except rospy.ServiceException as e:
print(self.namespace, 'service call to arm failed:', str(e),
file=sys.stderr)
return False
if not resp.success:
print(self.namespace, 'failed to arm', file=sys.stderr)
return False
print(self.namespace, 'armed')
return True
def _return_home(self):
print(self.namespace, 'land')
req = CommandTOLRequest()
req.min_pitch = 0.0
req.yaw = -math.pi if self.color == 'blue' else 0.0
req.latitude = self.start_position.latitude
req.longitude = self.start_position.longitude
req.altitude = self.start_position.altitude
service_name = '%s/mavros/cmd/land' % self.namespace
rospy.wait_for_service(service_name)
try:
service = rospy.ServiceProxy(service_name, CommandTOL)
resp = service.call(req)
except rospy.ServiceException as e:
print(self.namespace, 'service call to land failed:', str(e),
file=sys.stderr)
return False
if not resp.success:
print(self.namespace, 'failed to land', file=sys.stderr)
return False
print(self.namespace, 'landing')
self._set_state(STATE_LANDING)
return True
def compute_fk_client(self, group, joint_values, links):
rospy.wait_for_service('compute_fk')
try:
compute_fk = rospy.ServiceProxy('compute_fk', GetPositionFK)
header = Header()
header.stamp = rospy.Time.now()
header.frame_id = group.get_pose_reference_frame()
rs = RobotState()
rs.joint_state.header = header
rs.joint_state.name = group.get_active_joints()
rs.joint_state.position = joint_values
res = compute_fk(header, links, rs)
return res.pose_stamped
except rospy.ServiceException, e:
print "Service call failed: %s" % e
def get_service_types(self, services):
service_types = []
for service in services:
if service:
try:
service_name_str = str(service[0])
service_type_str = rosservice.get_service_type(service_name_str)
if service_type_str is not None:
service_types.append([service_name_str, service_type_str])
except rospy.ServiceException as e:
rospy.logerr("Information is invalid for the service : %s . %s" % (service_name_str, e))
continue
except rospy.ServiceIOException as e:
rospy.logerr("Unable to communicate with service : %s . %s" % (service_name_str, e))
continue
return service_types
def publish_to_ros(self, mode='transform', service_name='rigid_transforms/rigid_transform_publisher', namespace=None):
"""Publishes RigidTransform to ROS
If a transform referencing the same frames already exists in the ROS publisher, it is updated instead.
This checking is not order sensitive
Requires ROS rigid_transform_publisher service to be running. Assuming autolab_core is installed as a catkin package,
this can be done with: roslaunch autolab_core rigid_transforms.launch
Parameters
----------
mode : :obj:`str`
Mode in which to publish. In {'transform', 'frame'}
Defaults to 'transform'
service_name : string, optional
RigidTransformPublisher service to interface with. If the RigidTransformPublisher services are started through
rigid_transforms.launch it will be called rigid_transform_publisher
namespace : string, optional
Namespace to prepend to transform_listener_service. If None, current namespace is prepended.
Raises
------
rospy.ServiceException
If service call to rigid_transform_publisher fails
"""
if namespace == None:
service_name = rospy.get_namespace() + service_name
else:
service_name = namespace + service_name
rospy.wait_for_service(service_name, timeout = 10)
publisher = rospy.ServiceProxy(service_name, RigidTransformPublisher)
trans = self.translation
rot = self.quaternion
publisher(trans[0], trans[1], trans[2], rot[0], rot[1], rot[2], rot[3], self.from_frame, self.to_frame, mode)
def delete_from_ros(self, service_name='rigid_transforms/rigid_transform_publisher', namespace=None):
"""Removes RigidTransform referencing from_frame and to_frame from ROS publisher.
Note that this may not be this exact transform, but may that references the same frames (order doesn't matter)
Also, note that it may take quite a while for the transform to disappear from rigid_transform_publisher's cache
Requires ROS rigid_transform_publisher service to be running. Assuming autolab_core is installed as a catkin package,
this can be done with: roslaunch autolab_core rigid_transforms.launch
Parameters
----------
service_name : string, optional
RigidTransformPublisher service to interface with. If the RigidTransformPublisher services are started through
rigid_transforms.launch it will be called rigid_transform_publisher
namespace : string, optional
Namespace to prepend to transform_listener_service. If None, current namespace is prepended.
Raises
------
rospy.ServiceException
If service call to rigid_transform_publisher fails
"""
if namespace == None:
service_name = rospy.get_namespace() + service_name
else:
service_name = namespace + service_name
rospy.wait_for_service(service_name, timeout = 10)
publisher = rospy.ServiceProxy(service_name, RigidTransformPublisher)
publisher(0, 0, 0, 0, 0, 0, 0, self.from_frame, self.to_frame, 'delete')
def rigid_transform_from_ros(from_frame, to_frame, service_name='rigid_transforms/rigid_transform_listener', namespace=None):
"""Gets transform from ROS as a rigid transform
Requires ROS rigid_transform_publisher service to be running. Assuming autolab_core is installed as a catkin package,
this can be done with: roslaunch autolab_core rigid_transforms.launch
Parameters
----------
from_frame : :obj:`str`
to_frame : :obj:`str`
service_name : string, optional
RigidTransformListener service to interface with. If the RigidTransformListener services are started through
rigid_transforms.launch it will be called rigid_transform_listener
namespace : string, optional
Namespace to prepend to transform_listener_service. If None, current namespace is prepended.
Raises
------
rospy.ServiceException
If service call to rigid_transform_listener fails
"""
if namespace == None:
service_name = rospy.get_namespace() + service_name
else:
service_name = namespace + service_name
rospy.wait_for_service(service_name, timeout = 10)
listener = rospy.ServiceProxy(service_name, RigidTransformListener)
ret = listener(from_frame, to_frame)
quat = np.asarray([ret.w_rot, ret.x_rot, ret.y_rot, ret.z_rot])
trans = np.asarray([ret.x_trans, ret.y_trans, ret.z_trans])
rot = RigidTransform.rotation_from_quaternion(quat)
return RigidTransform(rotation=rot, translation=trans, from_frame=from_frame, to_frame=to_frame)
def init_rovio_button_handler(self):
try:
response = self.init_rovio_srv()
self.init_message_label['text'] = response.message
except rospy.ServiceException, e:
message = 'Service call to reset Rovio internal state failed: %s' % e
self.init_message_label['text'] = message
def compute_calibration(self):
"""
Computes the calibration through the ViSP service and returns it.
:rtype: easy_handeye.handeye_calibration.HandeyeCalibration
"""
if len(self.samples) < HandeyeCalibrator.MIN_SAMPLES:
rospy.logwarn("{} more samples needed! Not computing the calibration".format(
HandeyeCalibrator.MIN_SAMPLES - len(self.samples)))
return
# Update data
hand_world_samples, camera_marker_samples = self.get_visp_samples()
if len(hand_world_samples.transforms) != len(camera_marker_samples.transforms):
rospy.logerr("Different numbers of hand-world and camera-marker samples!")
raise AssertionError
rospy.loginfo("Computing from %g poses..." % len(self.samples))
try:
result = self.calibrate(camera_marker_samples, hand_world_samples)
rospy.loginfo("Computed calibration: {}".format(str(result)))
transl = result.effector_camera.translation
rot = result.effector_camera.rotation
result_tuple = ((transl.x, transl.y, transl.z),
(rot.x, rot.y, rot.z, rot.w))
ret = HandeyeCalibration(self.eye_on_hand,
self.robot_base_frame,
self.robot_effector_frame,
self.tracking_base_frame,
result_tuple)
return ret
except rospy.ServiceException as ex:
rospy.logerr("Calibration failed: " + str(ex))
return None
def fk_service_client(limb = "right"):
ns = "ExternalTools/" + limb + "/PositionKinematicsNode/FKService"
fksvc = rospy.ServiceProxy(ns, SolvePositionFK)
fkreq = SolvePositionFKRequest()
joints = JointState()
joints.name = ['right_j0', 'right_j1', 'right_j2', 'right_j3',
'right_j4', 'right_j5', 'right_j6']
joints.position = [0.763331, 0.415979, -1.728629, 1.482985,
-1.135621, -1.674347, -0.496337]
# Add desired pose for forward kinematics
fkreq.configuration.append(joints)
# Request forward kinematics from base to "right_hand" link
fkreq.tip_names.append('right_hand')
try:
rospy.wait_for_service(ns, 5.0)
resp = fksvc(fkreq)
except (rospy.ServiceException, rospy.ROSException), e:
rospy.logerr("Service call failed: %s" % (e,))
return False
# Check if result valid
if (resp.isValid[0]):
rospy.loginfo("SUCCESS - Valid Cartesian Solution Found")
rospy.loginfo("\nFK Cartesian Solution:\n")
rospy.loginfo("------------------")
rospy.loginfo("Response Message:\n%s", resp)
else:
rospy.logerr("INVALID JOINTS - No Cartesian Solution Found.")
return False
return True
def recognize_srv_callback(self, req):
"""
Method callback for the Recognize.srv
:param req: The service request
"""
self._response.recognitions = []
self._recognizing = True
try:
cv_image = self.bridge.imgmsg_to_cv2(req.image, "bgr8")
except CvBridgeError as e:
rospy.logerr(e)
self._image_widget.set_image(cv_image)
self._done_recognizing_button.setDisabled(False)
timeout = 60.0 # Maximum of 60 seconds
future = rospy.Time.now() + rospy.Duration(timeout)
rospy.loginfo("Waiting for manual recognition, maximum of %d seconds", timeout)
while not rospy.is_shutdown() and self._recognizing:
if rospy.Time.now() > future:
raise rospy.ServiceException("Timeout of %d seconds exceeded .." % timeout)
rospy.sleep(rospy.Duration(0.1))
self._done_recognizing_button.setDisabled(True)
return self._response
def __getattr__(self, name):
""" Override the __getattr__ method so that function calls become server requests
If the name is a method of the YuMiArm class, this returns a function that calls that
function on the YuMiArm instance in the server. The wait_for_res argument is not available
remotely and will always be set to True. This is to prevent odd desynchronized crashes
Otherwise, the name is considered to be an attribute, and getattr is called on the
YuMiArm instance in the server. Note that if it isn't an attribute either a RuntimeError
will be raised.
The difference here is that functions access the server *on call* and non-functions do
*on getting the name*
Also note that this is __getattr__, so things like __init__ and __dict__ WILL NOT trigger
this function as the YuMiArm_ROS object already has these as attributes.
"""
if name in YuMiArm.__dict__:
def handle_remote_call(*args, **kwargs):
""" Handle the remote call to some YuMiArm function.
"""
rospy.wait_for_service(self.arm_service, timeout = self.timeout)
arm = rospy.ServiceProxy(self.arm_service, ROSYumiArm)
if 'wait_for_res' in kwargs:
kwargs['wait_for_res'] = True
try:
response = arm(pickle.dumps(name), pickle.dumps(args), pickle.dumps(kwargs))
except rospy.ServiceException, e:
raise RuntimeError("Service call failed: {0}".format(str(e)))
return pickle.loads(response.ret)
return handle_remote_call
else:
rospy.wait_for_service(self.arm_service, timeout = self.timeout)
arm = rospy.ServiceProxy(self.arm_service, ROSYumiArm)
try:
response = arm(pickle.dumps('__getattribute__'), pickle.dumps(name), pickle.dumps(None))
except rospy.ServiceException, e:
raise RuntimeError("Could not get attribute: {0}".format(str(e)))
return pickle.loads(response.ret)
def perform_service_call(service_name):
"""
POST /api/<version>/service/<service_name>
POST to a service to change it somehow. service_name may be a path.
For example, to start an environmental recipe in an environment, use the
start_recipe service:
POST /api/<version>/service/<environment_id>/start_recipe {"recipe_id": <id>}
"""
# Add leading slash to service_name. ROS qualifies all services with a
# leading slash.
service_name = '/' + service_name
# Find the class that ROS autogenerates from srv files that is associated
# with this service.
try:
ServiceClass = rosservice.get_service_class_by_name(service_name)
except rosservice.ROSServiceException as e:
return error(e)
# If we made it this far, the service exists.
# Wait for service to be ready before attempting to use it.
try:
rospy.wait_for_service(service_name, 1)
except rospy.ROSException as e:
raise socket.error()
# Get JSON args if they exist. If they don't, treat as if empty array
# was passed.
json = request.get_json(silent=True)
args = json if json else []
service_proxy = rospy.ServiceProxy(service_name, ServiceClass)
try:
# Unpack the list of args and pass to service_proxy function.
res = service_proxy(*args)
except (rospy.ServiceException, TypeError) as e:
return error(e)
status_code = 200 if getattr(res, "success", True) else 400
data = {k: getattr(res, k) for k in res.__slots__}
return jsonify(data), status_code
def compute_zmp_support_area_ros(contacts, p_in, z_out, name, color):
req = contact_stability.srv.PendularAreaRequest(
contacts=convert_contacts_to_ros(contacts),
mass=robot.mass,
p_in=geometry_msgs.msg.Point(p_in[0], p_in[1], p_in[2]),
z_out=z_out)
try:
# This commented block was used to gather computation times reported in
# Table III of the paper.
#
# if False:
# t0 = time.time()
# res = compute_support_area(req)
# if False:
# G = contacts.compute_wrench_cone(p_in)
# t0 = time.time()
# f = numpy.array([0., 0., robot.mass * 9.81])
# tau = zeros(3)
# wrench = numpy.hstack([f, tau])
# check = all(dot(G, wrench) <= 0)
# if False:
# t0 = time.time()
# contacts.find_static_supporting_forces(p_in, robot.mass)
# area_comp_times[contacts.nb_contacts].append(time.time() - t0)
res = compute_support_area(req)
return convert_ros_response(res)
except rospy.ServiceException as e:
rospy.logwarn(str(e))
return [], []
def UAVCAN_Subscribe(self):
def handler(event):
ros_req = canros.copy_uavcan_ros(self.Request_Type(), event.request, request=True)
setattr(ros_req, canros.uavcan_id_field_name, event.transfer.source_node_id)
try:
ros_resp = self.ROS_ServiceProxy(ros_req)
except rospy.ServiceException:
return
return canros.copy_ros_uavcan(self.UAVCAN_Type.Response(), ros_resp, request=False)
uavcan_node.add_handler(self.UAVCAN_Type, handler)
def __get_node_info(self):
if monotonic.monotonic() - self.__node_info_last >= 1:
self.__node_info_last = monotonic.monotonic()
try:
self.__node_info = self.__node_info_sp().node_info
except rospy.ServiceException:
raise Exception("Could not connect to canros server.")
return self.__node_info
def robot_inference_client(x):
rospy.wait_for_service('inference')
try:
infer = rospy.ServiceProxy('inference', Inference)
resp1 = infer(x)
return resp1.inference
except rospy.ServiceException, e:
print ("Service call failed: %s"%e)
def get_endeffector_pos(self, pos_only=True):
"""
:param pos_only: only return postion
:return:
"""
fkreq = SolvePositionFKRequest()
joints = JointState()
joints.name = self.ctrl.limb.joint_names()
joints.position = [self.ctrl.limb.joint_angle(j)
for j in joints.name]
# Add desired pose for forward kinematics
fkreq.configuration.append(joints)
fkreq.tip_names.append('right_hand')
try:
rospy.wait_for_service(self.name_of_service, 5)
resp = self.fksvc(fkreq)
except (rospy.ServiceException, rospy.ROSException), e:
rospy.logerr("Service call failed: %s" % (e,))
return False
pos = np.array([resp.pose_stamp[0].pose.position.x,
resp.pose_stamp[0].pose.position.y,
resp.pose_stamp[0].pose.position.z])
return pos
def query_action(self):
if self.use_robot:
if self.use_aux:
self.recorder.get_aux_img()
imageaux1 = self.recorder.ltob_aux1.img_msg
else:
imageaux1 = np.zeros((64, 64, 3), dtype=np.uint8)
imageaux1 = self.bridge.cv2_to_imgmsg(imageaux1)
imagemain = self.bridge.cv2_to_imgmsg(self.recorder.ltob.img_cropped)
state = self.get_endeffector_pos()
else:
imagemain = np.zeros((64,64,3))
imagemain = self.bridge.cv2_to_imgmsg(imagemain)
imageaux1 = self.bridge.cv2_to_imgmsg(self.test_img)
state = np.zeros(3)
try:
rospy.wait_for_service('get_action', timeout=240)
get_action_resp = self.get_action_func(imagemain, imageaux1,
tuple(state),
tuple(self.desig_pos_main.flatten()),
tuple(self.goal_pos_main.flatten()))
action_vec = get_action_resp.action
except (rospy.ServiceException, rospy.ROSException), e:
rospy.logerr("Service call failed: %s" % (e,))
raise ValueError('get action service call failed')
return action_vec
def fk_service_client(limb = "right"):
ns = "ExternalTools/" + limb + "/PositionKinematicsNode/FKService"
fksvc = rospy.ServiceProxy(ns, SolvePositionFK)
fkreq = SolvePositionFKRequest()
joints = JointState()
joints.name = ['right_j0', 'right_j1', 'right_j2', 'right_j3',
'right_j4', 'right_j5', 'right_j6']
joints.position = [0.763331, 0.415979, -1.728629, 1.482985,
-1.135621, -1.674347, -0.496337]
# Add desired pose for forward kinematics
fkreq.configuration.append(joints)
# Request forward kinematics from base to "right_hand" link
fkreq.tip_names.append('right_hand')
try:
rospy.wait_for_service(ns, 5.0)
resp = fksvc(fkreq)
except (rospy.ServiceException, rospy.ROSException), e:
rospy.logerr("Service call failed: %s" % (e,))
return False
# Check if result valid
if (resp.isValid[0]):
rospy.loginfo("SUCCESS - Valid Cartesian Solution Found")
rospy.loginfo("\nFK Cartesian Solution:\n")
rospy.loginfo("------------------")
rospy.loginfo("Response Message:\n%s", resp)
else:
rospy.loginfo("INVALID JOINTS - No Cartesian Solution Found.")
return True
def init_traj(self, itr):
assert self.instance_type == 'main'
# request init service for auxiliary recorders
if self.use_aux:
try:
rospy.wait_for_service('init_traj', timeout=1)
resp1 = self.init_traj_func(itr, self.igrp)
except (rospy.ServiceException, rospy.ROSException), e:
rospy.logerr("Service call failed: %s" % (e,))
raise ValueError('get_kinectdata service failed')
self._init_traj_local(itr)
if ((itr+1) % self.ngroup) == 0:
self.igrp += 1
def get_aux_img(self):
try:
rospy.wait_for_service('get_kinectdata', 0.1)
resp1 = self.get_kinectdata_func()
self.ltob_aux1.img_msg = resp1.image
except (rospy.ServiceException, rospy.ROSException), e:
rospy.logerr("Service call failed: %s" % (e,))
raise ValueError('get_kinectdata service failed')
demo_graspEventdetect.py 文件源码
项目:cu-perception-manipulation-stack
作者: correlllab
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def ik_quaternion(self, quaternion_pose):
"""Take a xyz qxqyqzqw stamped pose and convert it to joint angles
using IK. You can call self.limb.move_to_joint_positions to
move to those angles
"""
node = ("ExternalTools/{}/PositionKinematicsNode/"
"IKService".format(self.limb_name))
ik_service = rospy.ServiceProxy(node, SolvePositionIK)
ik_request = SolvePositionIKRequest()
ik_request.pose_stamp.append(quaternion_pose)
try:
rospy.loginfo('ik: waiting for IK service...')
rospy.wait_for_service(node, 5.0)
ik_response = ik_service(ik_request)
except (rospy.ServiceException, rospy.ROSException) as err:
rospy.logerr("ik_move: service request failed: %r" % err)
else:
if ik_response.isValid[0]:
rospy.loginfo("ik_move: valid joint configuration found")
# convert response to joint position control dictionary
limb_joints = dict(zip(ik_response.joints[0].name,
ik_response.joints[0].position))
return limb_joints
else:
rospy.logerr('ik_move: no valid configuration found')
return None