python类wait_for_service()的实例源码

api.py 文件源码 项目:openag_brain 作者: OpenAgInitiative 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def perform_service_call(service_name):
    """
    POST /api/<version>/service/<service_name>

    POST to a service to change it somehow. service_name may be a path.

    For example, to start an environmental recipe in an environment, use the
    start_recipe service:

        POST /api/<version>/service/<environment_id>/start_recipe {"recipe_id": <id>}
    """
    # Add leading slash to service_name. ROS qualifies all services with a
    # leading slash.
    service_name = '/' + service_name
    # Find the class that ROS autogenerates from srv files that is associated
    # with this service.
    try:
        ServiceClass = rosservice.get_service_class_by_name(service_name)
    except rosservice.ROSServiceException as e:
        return error(e)
    # If we made it this far, the service exists.
    # Wait for service to be ready before attempting to use it.
    try:
        rospy.wait_for_service(service_name, 1)
    except rospy.ROSException as e:
        raise socket.error()
    # Get JSON args if they exist. If they don't, treat as if empty array
    # was passed.
    json = request.get_json(silent=True)
    args = json if json else []
    service_proxy = rospy.ServiceProxy(service_name, ServiceClass)
    try:
        # Unpack the list of args and pass to service_proxy function.
        res = service_proxy(*args)
    except (rospy.ServiceException, TypeError) as e:
        return error(e)
    status_code = 200 if getattr(res, "success", True) else 400
    data = {k: getattr(res, k) for k in res.__slots__}
    return jsonify(data), status_code
test_recipe_handler.py 文件源码 项目:openag_brain 作者: OpenAgInitiative 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def call_service(service_name, result_msg, result_value, *args):
        # Add leading slash to service_name. ROS qualifies all services with a
        # leading slash.
        # Find the class that ROS autogenerates from srv files that is associated
        # with this service.
        ServiceClass = rosservice.get_service_class_by_name(service_name)
        rospy.wait_for_service(service_name, 1)
        service_proxy = rospy.ServiceProxy(service_name, ServiceClass)
        res = service_proxy(*args)
        print(service_name)
        print("-----------######-------------")
        print(res)
        assert getattr(res, "success", result_value)
        #assert res
crazyflie.py 文件源码 项目:crazyswarm 作者: USC-ACTLab 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def __init__(self):
        rospy.wait_for_service("/next_phase")
        self.nextPhase = rospy.ServiceProxy("/next_phase", Empty)
crazyflie.py 文件源码 项目:crazyswarm 作者: USC-ACTLab 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __init__(self):
        rospy.init_node("CrazyflieAPI", anonymous=False)
        rospy.wait_for_service("/emergency")
        self.emergencyService = rospy.ServiceProxy("/emergency", Empty)
        rospy.wait_for_service("/takeoff")
        self.takeoffService = rospy.ServiceProxy("/takeoff", Takeoff)
        rospy.wait_for_service("/land")
        self.landService = rospy.ServiceProxy("/land", Land)
        rospy.wait_for_service("/start_trajectory");
        self.startTrajectoryService = rospy.ServiceProxy("/start_trajectory", StartTrajectory)
        rospy.wait_for_service("/start_trajectory_reversed");
        self.startTrajectoryReversedService = rospy.ServiceProxy("/start_trajectory_reversed", StartTrajectoryReversed)
        rospy.wait_for_service("/start_ellipse")
        self.ellipseService = rospy.ServiceProxy("/start_ellipse", StartEllipse)
        rospy.wait_for_service("/start_canned_trajectory")
        self.startCannedTrajectoryService = rospy.ServiceProxy("/start_canned_trajectory", StartCannedTrajectory)
        rospy.wait_for_service("/go_home");
        self.goHomeService = rospy.ServiceProxy("/go_home", GoHome)
        rospy.wait_for_service("/update_params")
        self.updateParamsService = rospy.ServiceProxy("/update_params", UpdateParams)

        with open("../launch/crazyflies.yaml", 'r') as ymlfile:
            cfg = yaml.load(ymlfile)

        self.tf = TransformListener()

        self.crazyflies = []
        self.crazyfliesById = dict()
        for crazyflie in cfg["crazyflies"]:
            id = int(crazyflie["id"])
            initialPosition = crazyflie["initialPosition"]
            cf = Crazyflie(id, initialPosition, self.tf)
            self.crazyflies.append(cf)
            self.crazyfliesById[id] = cf
motion_editor.py 文件源码 项目:multi-contact-zmp 作者: stephane-caron 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def init_ros():
    global compute_com_area
    global compute_support_area
    rospy.init_node('real_time_control')
    rospy.wait_for_service(
        '/contact_stability/static/compute_support_area')
    rospy.wait_for_service(
        '/contact_stability/pendular/compute_support_area')
    compute_com_area = rospy.ServiceProxy(
        '/contact_stability/static/compute_support_area',
        contact_stability.srv.StaticArea)
    compute_support_area = rospy.ServiceProxy(
        '/contact_stability/pendular/compute_support_area',
        contact_stability.srv.PendularArea)
test_task.py 文件源码 项目:needybot-core 作者: needybot 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_services(self):

        '''
        Test varius services that aren'ts tested individually
        '''

        rospy.wait_for_service("mytask_step_name")
        proxy = rospy.ServiceProxy(
            "mytask_step_name",
            Message 
        )

        response = proxy(MessageRequest())
        self.assertEqual(response.value, 'load')

        proxy = rospy.ServiceProxy(
            "mytask_task_payload",
            TaskPayload 
        )

        response = proxy()
        self.assertEqual(response.status, TaskStatusRequest.RUNNING)
        self.assertFalse(response.did_fail)

        proxy = rospy.ServiceProxy(
            "mytask_next_step",
            NextStep 
        )

        response = proxy(status=TaskStepStatus.SUCCESS)
        self.assertEqual(response.name, 'say_hello')
        response = proxy(status=TaskStepStatus.FAILURE)
        self.assertEqual(response.name, 'abort')
test_task.py 文件源码 项目:needybot-core 作者: needybot 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_status(self):
        rospy.wait_for_service("mytask_status")
        proxy = rospy.ServiceProxy(
            "mytask_status",
            TaskStatus 
        )

        response = proxy(TaskStatusRequest())
        self.assertEqual(response.status, TaskStatusRequest.RUNNING)
base.py 文件源码 项目:needybot-core 作者: needybot 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def service_builder(self, service_def):

        """
        Method for generating service endpoints for this Client.

        Each service contains a type and a service definition, as
        defined in the service's .srv file.

        For each service, a method is attached to the client that provides an
        endpoint with which other nodes can interact with the service.

        Because services must be uniquely namespaced, each service proxy will be
        exposed externally omitting the class's namespace as defined at
        construction time, but mapping to a service that uses the service
        class's namepspace. This means service proxies map as follows

        self.method('some message')       ->       /self.namespace_method

        Args:
            service (string): the name of the service
        """

        service_name, service_type = service_def
        namespaced_service = '{}_{}'.format(self.namespace, service_name)

        def service_method(self, *args, **kwargs):
            rospy.wait_for_service(namespaced_service)
            service_proxy = rospy.ServiceProxy(namespaced_service, service_type)
            return service_proxy(self, *args, **kwargs)

        setattr(self, service_name, service_method)
laser_assemble.py 文件源码 项目:srcsim2017 作者: ZarjRobotics 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def run(self):
        """ Run our code """
        rospy.loginfo("Start laser test code")
        rospy.wait_for_service("assemble_scans2")

        # Todo - publish the spin logic...
        self.laser_publisher = rospy.Publisher("/multisense/lidar_cloud", PointCloud, queue_size=10)
        self.scan_service = rospy.ServiceProxy('assemble_scans', AssembleScans)
        #self.bridge = CvBridge()

        #self.zarj_os.zps.look_down()

        while True:
            begin = rospy.get_rostime()
            rospy.sleep(3.0)
            resp = self.scan_service(begin, rospy.get_rostime())
            self.laser_publisher.publish(resp.cloud)
            #print "Got cloud {}x{}".format(resp.cloud.height, resp.cloud.width)
            img = self.create_image_from_cloud(resp.cloud.points)
            cv2.destroyAllWindows()
            print "New image"
            cv2.imshow("My image", img)
            cv2.waitKey(1)
            #print resp
            #cv_image = self.bridge.imgmsg_to_cv2(resp.cloud, "bgr8")
            #cv2.imshow("Point cloud", cv_image)
ros_services.py 文件源码 项目:aide 作者: Lambda-3 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def get_service(self):
        """
        Returns the service which then can be called.

        !! Does not call the service.

        :return: the service. 
        """
        rospy.wait_for_service(self.SERVICE_CHANNEL)
        return rospy.ServiceProxy(self.SERVICE_CHANNEL, self.service_class)
ros_services.py 文件源码 项目:aide 作者: Lambda-3 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def call_service(self, **kwargs):
        """
        Calls the service.

        :param kwargs: Arguments of the service.
        :return: Result of the service call.
        """
        rospy.wait_for_service(self.SERVICE_CHANNEL)
        return rospy.ServiceProxy(self.SERVICE_CHANNEL, self.service_class)(**kwargs)


# Create a list of all Services that are implemented
robot_inference_client.py 文件源码 项目:ros-robot-inference 作者: anchen1011 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def robot_inference_client(x):
    rospy.wait_for_service('inference')
    try:
        infer = rospy.ServiceProxy('inference', Inference)
        resp1 = infer(x)
        return resp1.inference
    except rospy.ServiceException, e:
        print ("Service call failed: %s"%e)
visual_mpc_client.py 文件源码 项目:visual_mpc 作者: febert 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_endeffector_pos(self, pos_only=True):
        """
        :param pos_only: only return postion
        :return:
        """

        fkreq = SolvePositionFKRequest()
        joints = JointState()
        joints.name = self.ctrl.limb.joint_names()
        joints.position = [self.ctrl.limb.joint_angle(j)
                        for j in joints.name]

        # Add desired pose for forward kinematics
        fkreq.configuration.append(joints)
        fkreq.tip_names.append('right_hand')
        try:
            rospy.wait_for_service(self.name_of_service, 5)
            resp = self.fksvc(fkreq)
        except (rospy.ServiceException, rospy.ROSException), e:
            rospy.logerr("Service call failed: %s" % (e,))
            return False

        pos = np.array([resp.pose_stamp[0].pose.position.x,
                         resp.pose_stamp[0].pose.position.y,
                         resp.pose_stamp[0].pose.position.z])
        return pos
visual_mpc_client.py 文件源码 项目:visual_mpc 作者: febert 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def query_action(self):

        if self.use_robot:
            if self.use_aux:
                self.recorder.get_aux_img()
                imageaux1 = self.recorder.ltob_aux1.img_msg
            else:
                imageaux1 = np.zeros((64, 64, 3), dtype=np.uint8)
                imageaux1 = self.bridge.cv2_to_imgmsg(imageaux1)

            imagemain = self.bridge.cv2_to_imgmsg(self.recorder.ltob.img_cropped)
            state = self.get_endeffector_pos()
        else:
            imagemain = np.zeros((64,64,3))
            imagemain = self.bridge.cv2_to_imgmsg(imagemain)
            imageaux1 = self.bridge.cv2_to_imgmsg(self.test_img)
            state = np.zeros(3)

        try:
            rospy.wait_for_service('get_action', timeout=240)
            get_action_resp = self.get_action_func(imagemain, imageaux1,
                                              tuple(state),
                                              tuple(self.desig_pos_main.flatten()),
                                              tuple(self.goal_pos_main.flatten()))

            action_vec = get_action_resp.action

        except (rospy.ServiceException, rospy.ROSException), e:
            rospy.logerr("Service call failed: %s" % (e,))
            raise ValueError('get action service call failed')
        return action_vec
forward_kinematics.py 文件源码 项目:visual_mpc 作者: febert 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def fk_service_client(limb = "right"):
    ns = "ExternalTools/" + limb + "/PositionKinematicsNode/FKService"
    fksvc = rospy.ServiceProxy(ns, SolvePositionFK)
    fkreq = SolvePositionFKRequest()
    joints = JointState()
    joints.name = ['right_j0', 'right_j1', 'right_j2', 'right_j3',
                   'right_j4', 'right_j5', 'right_j6']
    joints.position = [0.763331, 0.415979, -1.728629, 1.482985,
                       -1.135621, -1.674347, -0.496337]
    # Add desired pose for forward kinematics
    fkreq.configuration.append(joints)
    # Request forward kinematics from base to "right_hand" link
    fkreq.tip_names.append('right_hand')

    try:
        rospy.wait_for_service(ns, 5.0)
        resp = fksvc(fkreq)
    except (rospy.ServiceException, rospy.ROSException), e:
        rospy.logerr("Service call failed: %s" % (e,))
        return False

    # Check if result valid
    if (resp.isValid[0]):
        rospy.loginfo("SUCCESS - Valid Cartesian Solution Found")
        rospy.loginfo("\nFK Cartesian Solution:\n")
        rospy.loginfo("------------------")
        rospy.loginfo("Response Message:\n%s", resp)
    else:
        rospy.loginfo("INVALID JOINTS - No Cartesian Solution Found.")

    return True
robot_recorder.py 文件源码 项目:visual_mpc 作者: febert 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def init_traj(self, itr):
        assert self.instance_type == 'main'
        # request init service for auxiliary recorders
        if self.use_aux:
            try:
                rospy.wait_for_service('init_traj', timeout=1)
                resp1 = self.init_traj_func(itr, self.igrp)
            except (rospy.ServiceException, rospy.ROSException), e:
                rospy.logerr("Service call failed: %s" % (e,))
                raise ValueError('get_kinectdata service failed')

        self._init_traj_local(itr)

        if ((itr+1) % self.ngroup) == 0:
            self.igrp += 1
robot_recorder.py 文件源码 项目:visual_mpc 作者: febert 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_aux_img(self):
        try:
            rospy.wait_for_service('get_kinectdata', 0.1)
            resp1 = self.get_kinectdata_func()
            self.ltob_aux1.img_msg = resp1.image
        except (rospy.ServiceException, rospy.ROSException), e:
            rospy.logerr("Service call failed: %s" % (e,))
            raise ValueError('get_kinectdata service failed')
demo_graspEventdetect.py 文件源码 项目:cu-perception-manipulation-stack 作者: correlllab 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def ik_quaternion(self, quaternion_pose):
        """Take a xyz qxqyqzqw stamped pose and convert it to joint angles
        using IK. You can call self.limb.move_to_joint_positions to
        move to those angles

        """
        node = ("ExternalTools/{}/PositionKinematicsNode/"
                "IKService".format(self.limb_name))
        ik_service = rospy.ServiceProxy(node, SolvePositionIK)
        ik_request = SolvePositionIKRequest()
        ik_request.pose_stamp.append(quaternion_pose)
        try:
            rospy.loginfo('ik: waiting for IK service...')
            rospy.wait_for_service(node, 5.0)
            ik_response = ik_service(ik_request)
        except (rospy.ServiceException, rospy.ROSException) as err:
            rospy.logerr("ik_move: service request failed: %r" % err)
        else:
            if ik_response.isValid[0]:
                rospy.loginfo("ik_move: valid joint configuration found")
                # convert response to joint position control dictionary
                limb_joints = dict(zip(ik_response.joints[0].name,
                                       ik_response.joints[0].position))
                return limb_joints
            else:
                rospy.logerr('ik_move: no valid configuration found')
                return None
demo_vision_poseest_pickplace.py 文件源码 项目:cu-perception-manipulation-stack 作者: correlllab 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def ik_quaternion(self, quaternion_pose):
        """Take a xyz qxqyqzqw stamped pose and convert it to joint angles
        using IK. You can call self.limb.move_to_joint_positions to
        move to those angles

        """
        node = ("ExternalTools/{}/PositionKinematicsNode/"
                "IKService".format(self.limb_name))
        ik_service = rospy.ServiceProxy(node, SolvePositionIK)
        ik_request = SolvePositionIKRequest()
        ik_request.pose_stamp.append(quaternion_pose)
        try:
            rospy.loginfo('ik: waiting for IK service...')
            rospy.wait_for_service(node, 5.0)
            ik_response = ik_service(ik_request)
        except (rospy.ServiceException, rospy.ROSException) as err:
            rospy.logerr("ik_move: service request failed: %r" % err)
        else:
            if ik_response.isValid[0]:
                rospy.loginfo("ik_move: valid joint configuration found")
                # convert response to joint position control dictionary
                limb_joints = dict(zip(ik_response.joints[0].name,
                                       ik_response.joints[0].position))
                return limb_joints
            else:
                rospy.logerr('ik_move: no valid configuration found')
                return None
demo_graspsuccessExp.py 文件源码 项目:cu-perception-manipulation-stack 作者: correlllab 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def ik_quaternion(self, quaternion_pose):
        """Take a xyz qxqyqzqw stamped pose and convert it to joint angles
        using IK. You can call self.limb.move_to_joint_positions to
        move to those angles

        """
        node = ("ExternalTools/{}/PositionKinematicsNode/"
                "IKService".format(self.limb_name))
        ik_service = rospy.ServiceProxy(node, SolvePositionIK)
        ik_request = SolvePositionIKRequest()
        ik_request.pose_stamp.append(quaternion_pose)
        try:
            rospy.loginfo('ik: waiting for IK service...')
            rospy.wait_for_service(node, 5.0)
            ik_response = ik_service(ik_request)
        except (rospy.ServiceException, rospy.ROSException) as err:
            rospy.logerr("ik_move: service request failed: %r" % err)
        else:
            if ik_response.isValid[0]:
                rospy.loginfo("ik_move: valid joint configuration found")
                # convert response to joint position control dictionary
                limb_joints = dict(zip(ik_response.joints[0].name,
                                       ik_response.joints[0].position))
                return limb_joints
            else:
                rospy.logerr('ik_move: no valid configuration found')
                return None


问题


面经


文章

微信
公众号

扫码关注公众号