def perform_service_call(service_name):
"""
POST /api/<version>/service/<service_name>
POST to a service to change it somehow. service_name may be a path.
For example, to start an environmental recipe in an environment, use the
start_recipe service:
POST /api/<version>/service/<environment_id>/start_recipe {"recipe_id": <id>}
"""
# Add leading slash to service_name. ROS qualifies all services with a
# leading slash.
service_name = '/' + service_name
# Find the class that ROS autogenerates from srv files that is associated
# with this service.
try:
ServiceClass = rosservice.get_service_class_by_name(service_name)
except rosservice.ROSServiceException as e:
return error(e)
# If we made it this far, the service exists.
# Wait for service to be ready before attempting to use it.
try:
rospy.wait_for_service(service_name, 1)
except rospy.ROSException as e:
raise socket.error()
# Get JSON args if they exist. If they don't, treat as if empty array
# was passed.
json = request.get_json(silent=True)
args = json if json else []
service_proxy = rospy.ServiceProxy(service_name, ServiceClass)
try:
# Unpack the list of args and pass to service_proxy function.
res = service_proxy(*args)
except (rospy.ServiceException, TypeError) as e:
return error(e)
status_code = 200 if getattr(res, "success", True) else 400
data = {k: getattr(res, k) for k in res.__slots__}
return jsonify(data), status_code
python类wait_for_service()的实例源码
def call_service(service_name, result_msg, result_value, *args):
# Add leading slash to service_name. ROS qualifies all services with a
# leading slash.
# Find the class that ROS autogenerates from srv files that is associated
# with this service.
ServiceClass = rosservice.get_service_class_by_name(service_name)
rospy.wait_for_service(service_name, 1)
service_proxy = rospy.ServiceProxy(service_name, ServiceClass)
res = service_proxy(*args)
print(service_name)
print("-----------######-------------")
print(res)
assert getattr(res, "success", result_value)
#assert res
def __init__(self):
rospy.wait_for_service("/next_phase")
self.nextPhase = rospy.ServiceProxy("/next_phase", Empty)
def __init__(self):
rospy.init_node("CrazyflieAPI", anonymous=False)
rospy.wait_for_service("/emergency")
self.emergencyService = rospy.ServiceProxy("/emergency", Empty)
rospy.wait_for_service("/takeoff")
self.takeoffService = rospy.ServiceProxy("/takeoff", Takeoff)
rospy.wait_for_service("/land")
self.landService = rospy.ServiceProxy("/land", Land)
rospy.wait_for_service("/start_trajectory");
self.startTrajectoryService = rospy.ServiceProxy("/start_trajectory", StartTrajectory)
rospy.wait_for_service("/start_trajectory_reversed");
self.startTrajectoryReversedService = rospy.ServiceProxy("/start_trajectory_reversed", StartTrajectoryReversed)
rospy.wait_for_service("/start_ellipse")
self.ellipseService = rospy.ServiceProxy("/start_ellipse", StartEllipse)
rospy.wait_for_service("/start_canned_trajectory")
self.startCannedTrajectoryService = rospy.ServiceProxy("/start_canned_trajectory", StartCannedTrajectory)
rospy.wait_for_service("/go_home");
self.goHomeService = rospy.ServiceProxy("/go_home", GoHome)
rospy.wait_for_service("/update_params")
self.updateParamsService = rospy.ServiceProxy("/update_params", UpdateParams)
with open("../launch/crazyflies.yaml", 'r') as ymlfile:
cfg = yaml.load(ymlfile)
self.tf = TransformListener()
self.crazyflies = []
self.crazyfliesById = dict()
for crazyflie in cfg["crazyflies"]:
id = int(crazyflie["id"])
initialPosition = crazyflie["initialPosition"]
cf = Crazyflie(id, initialPosition, self.tf)
self.crazyflies.append(cf)
self.crazyfliesById[id] = cf
def init_ros():
global compute_com_area
global compute_support_area
rospy.init_node('real_time_control')
rospy.wait_for_service(
'/contact_stability/static/compute_support_area')
rospy.wait_for_service(
'/contact_stability/pendular/compute_support_area')
compute_com_area = rospy.ServiceProxy(
'/contact_stability/static/compute_support_area',
contact_stability.srv.StaticArea)
compute_support_area = rospy.ServiceProxy(
'/contact_stability/pendular/compute_support_area',
contact_stability.srv.PendularArea)
def test_services(self):
'''
Test varius services that aren'ts tested individually
'''
rospy.wait_for_service("mytask_step_name")
proxy = rospy.ServiceProxy(
"mytask_step_name",
Message
)
response = proxy(MessageRequest())
self.assertEqual(response.value, 'load')
proxy = rospy.ServiceProxy(
"mytask_task_payload",
TaskPayload
)
response = proxy()
self.assertEqual(response.status, TaskStatusRequest.RUNNING)
self.assertFalse(response.did_fail)
proxy = rospy.ServiceProxy(
"mytask_next_step",
NextStep
)
response = proxy(status=TaskStepStatus.SUCCESS)
self.assertEqual(response.name, 'say_hello')
response = proxy(status=TaskStepStatus.FAILURE)
self.assertEqual(response.name, 'abort')
def test_status(self):
rospy.wait_for_service("mytask_status")
proxy = rospy.ServiceProxy(
"mytask_status",
TaskStatus
)
response = proxy(TaskStatusRequest())
self.assertEqual(response.status, TaskStatusRequest.RUNNING)
def service_builder(self, service_def):
"""
Method for generating service endpoints for this Client.
Each service contains a type and a service definition, as
defined in the service's .srv file.
For each service, a method is attached to the client that provides an
endpoint with which other nodes can interact with the service.
Because services must be uniquely namespaced, each service proxy will be
exposed externally omitting the class's namespace as defined at
construction time, but mapping to a service that uses the service
class's namepspace. This means service proxies map as follows
self.method('some message') -> /self.namespace_method
Args:
service (string): the name of the service
"""
service_name, service_type = service_def
namespaced_service = '{}_{}'.format(self.namespace, service_name)
def service_method(self, *args, **kwargs):
rospy.wait_for_service(namespaced_service)
service_proxy = rospy.ServiceProxy(namespaced_service, service_type)
return service_proxy(self, *args, **kwargs)
setattr(self, service_name, service_method)
def run(self):
""" Run our code """
rospy.loginfo("Start laser test code")
rospy.wait_for_service("assemble_scans2")
# Todo - publish the spin logic...
self.laser_publisher = rospy.Publisher("/multisense/lidar_cloud", PointCloud, queue_size=10)
self.scan_service = rospy.ServiceProxy('assemble_scans', AssembleScans)
#self.bridge = CvBridge()
#self.zarj_os.zps.look_down()
while True:
begin = rospy.get_rostime()
rospy.sleep(3.0)
resp = self.scan_service(begin, rospy.get_rostime())
self.laser_publisher.publish(resp.cloud)
#print "Got cloud {}x{}".format(resp.cloud.height, resp.cloud.width)
img = self.create_image_from_cloud(resp.cloud.points)
cv2.destroyAllWindows()
print "New image"
cv2.imshow("My image", img)
cv2.waitKey(1)
#print resp
#cv_image = self.bridge.imgmsg_to_cv2(resp.cloud, "bgr8")
#cv2.imshow("Point cloud", cv_image)
def get_service(self):
"""
Returns the service which then can be called.
!! Does not call the service.
:return: the service.
"""
rospy.wait_for_service(self.SERVICE_CHANNEL)
return rospy.ServiceProxy(self.SERVICE_CHANNEL, self.service_class)
def call_service(self, **kwargs):
"""
Calls the service.
:param kwargs: Arguments of the service.
:return: Result of the service call.
"""
rospy.wait_for_service(self.SERVICE_CHANNEL)
return rospy.ServiceProxy(self.SERVICE_CHANNEL, self.service_class)(**kwargs)
# Create a list of all Services that are implemented
def robot_inference_client(x):
rospy.wait_for_service('inference')
try:
infer = rospy.ServiceProxy('inference', Inference)
resp1 = infer(x)
return resp1.inference
except rospy.ServiceException, e:
print ("Service call failed: %s"%e)
def get_endeffector_pos(self, pos_only=True):
"""
:param pos_only: only return postion
:return:
"""
fkreq = SolvePositionFKRequest()
joints = JointState()
joints.name = self.ctrl.limb.joint_names()
joints.position = [self.ctrl.limb.joint_angle(j)
for j in joints.name]
# Add desired pose for forward kinematics
fkreq.configuration.append(joints)
fkreq.tip_names.append('right_hand')
try:
rospy.wait_for_service(self.name_of_service, 5)
resp = self.fksvc(fkreq)
except (rospy.ServiceException, rospy.ROSException), e:
rospy.logerr("Service call failed: %s" % (e,))
return False
pos = np.array([resp.pose_stamp[0].pose.position.x,
resp.pose_stamp[0].pose.position.y,
resp.pose_stamp[0].pose.position.z])
return pos
def query_action(self):
if self.use_robot:
if self.use_aux:
self.recorder.get_aux_img()
imageaux1 = self.recorder.ltob_aux1.img_msg
else:
imageaux1 = np.zeros((64, 64, 3), dtype=np.uint8)
imageaux1 = self.bridge.cv2_to_imgmsg(imageaux1)
imagemain = self.bridge.cv2_to_imgmsg(self.recorder.ltob.img_cropped)
state = self.get_endeffector_pos()
else:
imagemain = np.zeros((64,64,3))
imagemain = self.bridge.cv2_to_imgmsg(imagemain)
imageaux1 = self.bridge.cv2_to_imgmsg(self.test_img)
state = np.zeros(3)
try:
rospy.wait_for_service('get_action', timeout=240)
get_action_resp = self.get_action_func(imagemain, imageaux1,
tuple(state),
tuple(self.desig_pos_main.flatten()),
tuple(self.goal_pos_main.flatten()))
action_vec = get_action_resp.action
except (rospy.ServiceException, rospy.ROSException), e:
rospy.logerr("Service call failed: %s" % (e,))
raise ValueError('get action service call failed')
return action_vec
def fk_service_client(limb = "right"):
ns = "ExternalTools/" + limb + "/PositionKinematicsNode/FKService"
fksvc = rospy.ServiceProxy(ns, SolvePositionFK)
fkreq = SolvePositionFKRequest()
joints = JointState()
joints.name = ['right_j0', 'right_j1', 'right_j2', 'right_j3',
'right_j4', 'right_j5', 'right_j6']
joints.position = [0.763331, 0.415979, -1.728629, 1.482985,
-1.135621, -1.674347, -0.496337]
# Add desired pose for forward kinematics
fkreq.configuration.append(joints)
# Request forward kinematics from base to "right_hand" link
fkreq.tip_names.append('right_hand')
try:
rospy.wait_for_service(ns, 5.0)
resp = fksvc(fkreq)
except (rospy.ServiceException, rospy.ROSException), e:
rospy.logerr("Service call failed: %s" % (e,))
return False
# Check if result valid
if (resp.isValid[0]):
rospy.loginfo("SUCCESS - Valid Cartesian Solution Found")
rospy.loginfo("\nFK Cartesian Solution:\n")
rospy.loginfo("------------------")
rospy.loginfo("Response Message:\n%s", resp)
else:
rospy.loginfo("INVALID JOINTS - No Cartesian Solution Found.")
return True
def init_traj(self, itr):
assert self.instance_type == 'main'
# request init service for auxiliary recorders
if self.use_aux:
try:
rospy.wait_for_service('init_traj', timeout=1)
resp1 = self.init_traj_func(itr, self.igrp)
except (rospy.ServiceException, rospy.ROSException), e:
rospy.logerr("Service call failed: %s" % (e,))
raise ValueError('get_kinectdata service failed')
self._init_traj_local(itr)
if ((itr+1) % self.ngroup) == 0:
self.igrp += 1
def get_aux_img(self):
try:
rospy.wait_for_service('get_kinectdata', 0.1)
resp1 = self.get_kinectdata_func()
self.ltob_aux1.img_msg = resp1.image
except (rospy.ServiceException, rospy.ROSException), e:
rospy.logerr("Service call failed: %s" % (e,))
raise ValueError('get_kinectdata service failed')
demo_graspEventdetect.py 文件源码
项目:cu-perception-manipulation-stack
作者: correlllab
项目源码
文件源码
阅读 17
收藏 0
点赞 0
评论 0
def ik_quaternion(self, quaternion_pose):
"""Take a xyz qxqyqzqw stamped pose and convert it to joint angles
using IK. You can call self.limb.move_to_joint_positions to
move to those angles
"""
node = ("ExternalTools/{}/PositionKinematicsNode/"
"IKService".format(self.limb_name))
ik_service = rospy.ServiceProxy(node, SolvePositionIK)
ik_request = SolvePositionIKRequest()
ik_request.pose_stamp.append(quaternion_pose)
try:
rospy.loginfo('ik: waiting for IK service...')
rospy.wait_for_service(node, 5.0)
ik_response = ik_service(ik_request)
except (rospy.ServiceException, rospy.ROSException) as err:
rospy.logerr("ik_move: service request failed: %r" % err)
else:
if ik_response.isValid[0]:
rospy.loginfo("ik_move: valid joint configuration found")
# convert response to joint position control dictionary
limb_joints = dict(zip(ik_response.joints[0].name,
ik_response.joints[0].position))
return limb_joints
else:
rospy.logerr('ik_move: no valid configuration found')
return None
demo_vision_poseest_pickplace.py 文件源码
项目:cu-perception-manipulation-stack
作者: correlllab
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def ik_quaternion(self, quaternion_pose):
"""Take a xyz qxqyqzqw stamped pose and convert it to joint angles
using IK. You can call self.limb.move_to_joint_positions to
move to those angles
"""
node = ("ExternalTools/{}/PositionKinematicsNode/"
"IKService".format(self.limb_name))
ik_service = rospy.ServiceProxy(node, SolvePositionIK)
ik_request = SolvePositionIKRequest()
ik_request.pose_stamp.append(quaternion_pose)
try:
rospy.loginfo('ik: waiting for IK service...')
rospy.wait_for_service(node, 5.0)
ik_response = ik_service(ik_request)
except (rospy.ServiceException, rospy.ROSException) as err:
rospy.logerr("ik_move: service request failed: %r" % err)
else:
if ik_response.isValid[0]:
rospy.loginfo("ik_move: valid joint configuration found")
# convert response to joint position control dictionary
limb_joints = dict(zip(ik_response.joints[0].name,
ik_response.joints[0].position))
return limb_joints
else:
rospy.logerr('ik_move: no valid configuration found')
return None
demo_graspsuccessExp.py 文件源码
项目:cu-perception-manipulation-stack
作者: correlllab
项目源码
文件源码
阅读 16
收藏 0
点赞 0
评论 0
def ik_quaternion(self, quaternion_pose):
"""Take a xyz qxqyqzqw stamped pose and convert it to joint angles
using IK. You can call self.limb.move_to_joint_positions to
move to those angles
"""
node = ("ExternalTools/{}/PositionKinematicsNode/"
"IKService".format(self.limb_name))
ik_service = rospy.ServiceProxy(node, SolvePositionIK)
ik_request = SolvePositionIKRequest()
ik_request.pose_stamp.append(quaternion_pose)
try:
rospy.loginfo('ik: waiting for IK service...')
rospy.wait_for_service(node, 5.0)
ik_response = ik_service(ik_request)
except (rospy.ServiceException, rospy.ROSException) as err:
rospy.logerr("ik_move: service request failed: %r" % err)
else:
if ik_response.isValid[0]:
rospy.loginfo("ik_move: valid joint configuration found")
# convert response to joint position control dictionary
limb_joints = dict(zip(ik_response.joints[0].name,
ik_response.joints[0].position))
return limb_joints
else:
rospy.logerr('ik_move: no valid configuration found')
return None