python类ServiceProxy()的实例源码

perception_base.py 文件源码 项目:mhri 作者: mhri 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def __init__(self, node_name):
        rospy.init_node(node_name, anonymous=False)

        try:
            conf_file = rospy.get_param('~config_file')
        except KeyError as e:
            rospy.logerr('You should set the parameter for perception config file...')
            quit()

        with open(os.path.abspath(conf_file)) as f:
            self.conf_data = yaml.load(f.read())
            rospy.loginfo('loading perception config file... %d perception(s) exists...'%len(self.conf_data.keys()))
            for item in self.conf_data.keys():
                rospy.loginfo('\033[92m  - %s: %d event(s) and %d data(s).\033[0m'%(item, len(self.conf_data[item]['events']), len(self.conf_data[item]['data'])))

        self.dict_srv_wr = {}
        self.dict_srv_rd = {}

        for item in self.conf_data.keys():
            if self.conf_data[item].has_key('target_memory'):
                memory_name = self.conf_data[item]['target_memory']
                rospy.loginfo('\033[94m[%s]\033[0m wait for bringup %s...'%(rospy.get_name(), memory_name))

                rospy.wait_for_service('/%s/write_data'%memory_name)
                self.dict_srv_wr[memory_name] = rospy.ServiceProxy('/%s/write_data'%memory_name, WriteData)
                self.dict_srv_rd[memory_name] = rospy.ServiceProxy('/%s/read_data'%memory_name, ReadData)

                self.register_data_to_memory(memory_name, item, self.conf_data[item]['data'])

        self.is_enable_perception = True
        rospy.Subscriber('%s/start'%rospy.get_name(), Empty, self.handle_start_perception)
        rospy.Subscriber('%s/stop'%rospy.get_name(), Empty, self.handle_stop_perception)

        self.pub_event = rospy.Publisher('forwarding_event', ForwardingEvent, queue_size=10)
        rospy.loginfo('\033[94m[%s]\033[0m initialize perception_base done...'%rospy.get_name())
perception_base.py 文件源码 项目:mhri 作者: mhri 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def register_data_to_memory(self, memory_name, perception_name, data):
        rospy.wait_for_service('/%s/register_data'%memory_name)
        srv_register = rospy.ServiceProxy('/%s/register_data'%memory_name, RegisterData)

        srv_req = RegisterDataRequest()
        srv_req.perception_name = perception_name
        srv_req.data = json.dumps(data)

        return srv_register(srv_req)
yumi_arm.py 文件源码 项目:yumipy 作者: BerkeleyAutomation 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __getattr__(self, name):
        """ Override the __getattr__ method so that function calls become server requests

        If the name is a method of the YuMiArm class, this returns a function that calls that
        function on the YuMiArm instance in the server. The wait_for_res argument is not available
        remotely and will always be set to True. This is to prevent odd desynchronized crashes

        Otherwise, the name is considered to be an attribute, and getattr is called on the
        YuMiArm instance in the server. Note that if it isn't an attribute either a RuntimeError
        will be raised.

        The difference here is that functions access the server *on call* and non-functions do
        *on getting the name*

        Also note that this is __getattr__, so things like __init__ and __dict__ WILL NOT trigger
        this function as the YuMiArm_ROS object already has these as attributes.
        """
        if name in YuMiArm.__dict__:
            def handle_remote_call(*args, **kwargs):
                """ Handle the remote call to some YuMiArm function.
                """
                rospy.wait_for_service(self.arm_service, timeout = self.timeout)
                arm = rospy.ServiceProxy(self.arm_service, ROSYumiArm)
                if 'wait_for_res' in kwargs:
                    kwargs['wait_for_res'] = True
                try:
                    response = arm(pickle.dumps(name), pickle.dumps(args), pickle.dumps(kwargs))
                except rospy.ServiceException, e:
                    raise RuntimeError("Service call failed: {0}".format(str(e)))
                return pickle.loads(response.ret)
            return handle_remote_call
        else:
            rospy.wait_for_service(self.arm_service, timeout = self.timeout)
            arm = rospy.ServiceProxy(self.arm_service, ROSYumiArm)
            try:
                response = arm(pickle.dumps('__getattribute__'), pickle.dumps(name), pickle.dumps(None))
            except rospy.ServiceException, e:
                raise RuntimeError("Could not get attribute: {0}".format(str(e)))
            return pickle.loads(response.ret)
capture_features.py 文件源码 项目:RoboND-Perception-Exercises 作者: udacity 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def get_normals(cloud):
    get_normals_prox = rospy.ServiceProxy('/feature_extractor/get_normals', GetNormals)
    return get_normals_prox(cloud).cluster
training_helper.py 文件源码 项目:RoboND-Perception-Exercises 作者: udacity 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def capture_sample():
    """ Captures a PointCloud2 using the sensor stick RGBD camera

        Args: None

        Returns:
            PointCloud2: a single point cloud from the RGBD camrea
    """
    get_model_state_prox = rospy.ServiceProxy('gazebo/get_model_state',GetModelState)
    model_state = get_model_state_prox('training_model','world')

    set_model_state_prox = rospy.ServiceProxy('gazebo/set_model_state', SetModelState)

    roll = random.uniform(0, 2*math.pi)
    pitch = random.uniform(0, 2*math.pi)
    yaw = random.uniform(0, 2*math.pi)
    quaternion = tf.transformations.quaternion_from_euler(roll, pitch, yaw)
    model_state.pose.orientation.x = quaternion[0]
    model_state.pose.orientation.y = quaternion[1]
    model_state.pose.orientation.z = quaternion[2]
    model_state.pose.orientation.w = quaternion[3]
    sms_req = SetModelStateRequest()
    sms_req.model_state.pose = model_state.pose
    sms_req.model_state.twist = model_state.twist
    sms_req.model_state.model_name = 'training_model'
    sms_req.model_state.reference_frame = 'world'
    set_model_state_prox(sms_req)

    return rospy.wait_for_message('/sensor_stick/point_cloud', PointCloud2)
training_helper.py 文件源码 项目:RoboND-Perception-Exercises 作者: udacity 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def initial_setup():
    """ Prepares the Gazebo world for generating training data.

        In particular, this routine turns off gravity, so that the objects
        spawned in front of the RGBD camera will not fall. It also deletes
        the ground plane, so that the only depth points produce will
        correspond to the object of interest (eliminating the need for
        clustering and segmentation as part of the trianing process)

        Args: None

        Returns: None
    """
    rospy.wait_for_service('gazebo/get_model_state')
    rospy.wait_for_service('gazebo/set_model_state')
    rospy.wait_for_service('gazebo/get_physics_properties')
    rospy.wait_for_service('gazebo/set_physics_properties')
    rospy.wait_for_service('gazebo/spawn_sdf_model')

    get_physics_properties_prox = rospy.ServiceProxy('gazebo/get_physics_properties', GetPhysicsProperties)
    physics_properties = get_physics_properties_prox()

    physics_properties.gravity.z = 0

    set_physics_properties_prox = rospy.ServiceProxy('gazebo/set_physics_properties', SetPhysicsProperties)
    set_physics_properties_prox(physics_properties.time_step,
                                physics_properties.max_update_rate,
                                physics_properties.gravity,
                                physics_properties.ode_config)


    delete_model_prox = rospy.ServiceProxy('gazebo/delete_model', DeleteModel)
    delete_model_prox('ground_plane')
training_helper.py 文件源码 项目:RoboND-Perception-Exercises 作者: udacity 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def delete_model():
    # Delete the old model if it's stil around
    delete_model_prox = rospy.ServiceProxy('gazebo/delete_model', DeleteModel)
    delete_model_prox('training_model')
api.py 文件源码 项目:openag_brain 作者: OpenAgInitiative 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def perform_service_call(service_name):
    """
    POST /api/<version>/service/<service_name>

    POST to a service to change it somehow. service_name may be a path.

    For example, to start an environmental recipe in an environment, use the
    start_recipe service:

        POST /api/<version>/service/<environment_id>/start_recipe {"recipe_id": <id>}
    """
    # Add leading slash to service_name. ROS qualifies all services with a
    # leading slash.
    service_name = '/' + service_name
    # Find the class that ROS autogenerates from srv files that is associated
    # with this service.
    try:
        ServiceClass = rosservice.get_service_class_by_name(service_name)
    except rosservice.ROSServiceException as e:
        return error(e)
    # If we made it this far, the service exists.
    # Wait for service to be ready before attempting to use it.
    try:
        rospy.wait_for_service(service_name, 1)
    except rospy.ROSException as e:
        raise socket.error()
    # Get JSON args if they exist. If they don't, treat as if empty array
    # was passed.
    json = request.get_json(silent=True)
    args = json if json else []
    service_proxy = rospy.ServiceProxy(service_name, ServiceClass)
    try:
        # Unpack the list of args and pass to service_proxy function.
        res = service_proxy(*args)
    except (rospy.ServiceException, TypeError) as e:
        return error(e)
    status_code = 200 if getattr(res, "success", True) else 400
    data = {k: getattr(res, k) for k in res.__slots__}
    return jsonify(data), status_code
test_recipe_handler.py 文件源码 项目:openag_brain 作者: OpenAgInitiative 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def call_service(service_name, result_msg, result_value, *args):
        # Add leading slash to service_name. ROS qualifies all services with a
        # leading slash.
        # Find the class that ROS autogenerates from srv files that is associated
        # with this service.
        ServiceClass = rosservice.get_service_class_by_name(service_name)
        rospy.wait_for_service(service_name, 1)
        service_proxy = rospy.ServiceProxy(service_name, ServiceClass)
        res = service_proxy(*args)
        print(service_name)
        print("-----------######-------------")
        print(res)
        assert getattr(res, "success", result_value)
        #assert res
crazyflie.py 文件源码 项目:crazyswarm 作者: USC-ACTLab 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def __init__(self):
        rospy.wait_for_service("/next_phase")
        self.nextPhase = rospy.ServiceProxy("/next_phase", Empty)
crazyflie.py 文件源码 项目:crazyswarm 作者: USC-ACTLab 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __init__(self):
        rospy.init_node("CrazyflieAPI", anonymous=False)
        rospy.wait_for_service("/emergency")
        self.emergencyService = rospy.ServiceProxy("/emergency", Empty)
        rospy.wait_for_service("/takeoff")
        self.takeoffService = rospy.ServiceProxy("/takeoff", Takeoff)
        rospy.wait_for_service("/land")
        self.landService = rospy.ServiceProxy("/land", Land)
        rospy.wait_for_service("/start_trajectory");
        self.startTrajectoryService = rospy.ServiceProxy("/start_trajectory", StartTrajectory)
        rospy.wait_for_service("/start_trajectory_reversed");
        self.startTrajectoryReversedService = rospy.ServiceProxy("/start_trajectory_reversed", StartTrajectoryReversed)
        rospy.wait_for_service("/start_ellipse")
        self.ellipseService = rospy.ServiceProxy("/start_ellipse", StartEllipse)
        rospy.wait_for_service("/start_canned_trajectory")
        self.startCannedTrajectoryService = rospy.ServiceProxy("/start_canned_trajectory", StartCannedTrajectory)
        rospy.wait_for_service("/go_home");
        self.goHomeService = rospy.ServiceProxy("/go_home", GoHome)
        rospy.wait_for_service("/update_params")
        self.updateParamsService = rospy.ServiceProxy("/update_params", UpdateParams)

        with open("../launch/crazyflies.yaml", 'r') as ymlfile:
            cfg = yaml.load(ymlfile)

        self.tf = TransformListener()

        self.crazyflies = []
        self.crazyfliesById = dict()
        for crazyflie in cfg["crazyflies"]:
            id = int(crazyflie["id"])
            initialPosition = crazyflie["initialPosition"]
            cf = Crazyflie(id, initialPosition, self.tf)
            self.crazyflies.append(cf)
            self.crazyfliesById[id] = cf
motion_editor.py 文件源码 项目:multi-contact-zmp 作者: stephane-caron 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def init_ros():
    global compute_com_area
    global compute_support_area
    rospy.init_node('real_time_control')
    rospy.wait_for_service(
        '/contact_stability/static/compute_support_area')
    rospy.wait_for_service(
        '/contact_stability/pendular/compute_support_area')
    compute_com_area = rospy.ServiceProxy(
        '/contact_stability/static/compute_support_area',
        contact_stability.srv.StaticArea)
    compute_support_area = rospy.ServiceProxy(
        '/contact_stability/pendular/compute_support_area',
        contact_stability.srv.PendularArea)
spqrel_ros.py 文件源码 项目:spqrel_tools 作者: LCAS 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def _trigger_tobi(self, data):
        rospy.loginfo('trigger tobi')
        self._cam_sub = rospy.Subscriber(
            'spqrel_pepper/camera/front/camera/image_raw',
            Image, self._image_cb
        )
        self._depth_sub = rospy.Subscriber(
            'spqrel_pepper/camera/depth/camera/image_raw',
            Image, self._depth_cb
        )
        self._depth_info_sub = rospy.Subscriber(
            '/spqrel_pepper/camera/depth/camera/camera_info',
            CameraInfo, self._depth_info_cb
        )

        # d = rospy.ServiceProxy('/ms_face_api/detect', Detect)
        # r = DetectRequest()
        # r.topic = 'spqrel_pepper/camera/front/camera/image_raw'
        # res = d.call(r)
        # res_faces = []
        # for f in
        # res_dict = {
        #     'age': res.age,
        #     'gender': res.gender,
        #     'smile': res.smile
        # }
        # print json.dumps(res)
test_task.py 文件源码 项目:needybot-core 作者: needybot 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_services(self):

        '''
        Test varius services that aren'ts tested individually
        '''

        rospy.wait_for_service("mytask_step_name")
        proxy = rospy.ServiceProxy(
            "mytask_step_name",
            Message 
        )

        response = proxy(MessageRequest())
        self.assertEqual(response.value, 'load')

        proxy = rospy.ServiceProxy(
            "mytask_task_payload",
            TaskPayload 
        )

        response = proxy()
        self.assertEqual(response.status, TaskStatusRequest.RUNNING)
        self.assertFalse(response.did_fail)

        proxy = rospy.ServiceProxy(
            "mytask_next_step",
            NextStep 
        )

        response = proxy(status=TaskStepStatus.SUCCESS)
        self.assertEqual(response.name, 'say_hello')
        response = proxy(status=TaskStepStatus.FAILURE)
        self.assertEqual(response.name, 'abort')
test_task.py 文件源码 项目:needybot-core 作者: needybot 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_status(self):
        rospy.wait_for_service("mytask_status")
        proxy = rospy.ServiceProxy(
            "mytask_status",
            TaskStatus 
        )

        response = proxy(TaskStatusRequest())
        self.assertEqual(response.status, TaskStatusRequest.RUNNING)
base.py 文件源码 项目:needybot-core 作者: needybot 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def service_builder(self, service_def):

        """
        Method for generating service endpoints for this Client.

        Each service contains a type and a service definition, as
        defined in the service's .srv file.

        For each service, a method is attached to the client that provides an
        endpoint with which other nodes can interact with the service.

        Because services must be uniquely namespaced, each service proxy will be
        exposed externally omitting the class's namespace as defined at
        construction time, but mapping to a service that uses the service
        class's namepspace. This means service proxies map as follows

        self.method('some message')       ->       /self.namespace_method

        Args:
            service (string): the name of the service
        """

        service_name, service_type = service_def
        namespaced_service = '{}_{}'.format(self.namespace, service_name)

        def service_method(self, *args, **kwargs):
            rospy.wait_for_service(namespaced_service)
            service_proxy = rospy.ServiceProxy(namespaced_service, service_type)
            return service_proxy(self, *args, **kwargs)

        setattr(self, service_name, service_method)
task_server.py 文件源码 项目:needybot-core 作者: needybot 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def assign_task_proxies(self, task_name):
        """
        Assigns service proxies for specified task.

        Args:
            task_name(str): Name of task to lookup services for. Will assign proxies by
                            looking for service function with formatted string <task_name>_<service_name>
        """
        for task_arr in self.proxy_services: 
            service_name = "{}_{}".format(task_name, task_arr[0])
            #loginfo("WAITING ON SERVIVCE {}".format(service_name))
            rospy.wait_for_service(service_name)
            #loginfo("DONE WAITING ON SERVIVCE {}".format(service_name))
            setattr(
                self.blackboard,
                "{}_proxy".format(task_arr[0]),
                rospy.ServiceProxy(
                    service_name,
                    task_arr[1]
                )
            )

        if self.task_step_serve_sub:
            self.task_step_serve_sub.unregister()

        # Subscribe to topics to failure or success of task steps
        self.task_step_serve_sub = rospy.Subscriber(
            '/needybot/{}/step/serve'.format(task_name),
            ros_msg.String,
            self.task_step_serve_handler
        )
        # Call reset on task
        self.blackboard.reset_proxy(EmptyRequest())
laser_assemble.py 文件源码 项目:srcsim2017 作者: ZarjRobotics 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def run(self):
        """ Run our code """
        rospy.loginfo("Start laser test code")
        rospy.wait_for_service("assemble_scans2")

        # Todo - publish the spin logic...
        self.laser_publisher = rospy.Publisher("/multisense/lidar_cloud", PointCloud, queue_size=10)
        self.scan_service = rospy.ServiceProxy('assemble_scans', AssembleScans)
        #self.bridge = CvBridge()

        #self.zarj_os.zps.look_down()

        while True:
            begin = rospy.get_rostime()
            rospy.sleep(3.0)
            resp = self.scan_service(begin, rospy.get_rostime())
            self.laser_publisher.publish(resp.cloud)
            #print "Got cloud {}x{}".format(resp.cloud.height, resp.cloud.width)
            img = self.create_image_from_cloud(resp.cloud.points)
            cv2.destroyAllWindows()
            print "New image"
            cv2.imshow("My image", img)
            cv2.waitKey(1)
            #print resp
            #cv_image = self.bridge.imgmsg_to_cv2(resp.cloud, "bgr8")
            #cv2.imshow("Point cloud", cv_image)
fc.py 文件源码 项目:srcsim2017 作者: ZarjRobotics 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def start(self, name):
        """ Start the field computer """
        rospy.init_node(name)
        self.oclog('Getting robot_name')
        while not rospy.has_param('/ihmc_ros/robot_name'):
            print "-------------------------------------------------"
            print "Cannot run team_zarj_main.py, missing parameters!"
            print "Missing parameter '/ihmc_ros/robot_name'"
            print "Likely connection to ROS not present. Retry in 1 second."
            print "-------------------------------------------------"
            time.sleep(1)

        self.oclog('Booting ZarjOS')
        self.zarj = ZarjOS()

        self.start_task_service = rospy.ServiceProxy(
            "/srcsim/finals/start_task", StartTask)
        self.task_subscriber = rospy.Subscriber(
            "/srcsim/finals/task", Task, self.task_status)
        self.harness_subscriber = rospy.Subscriber(
            "/srcsim/finals/harness", Harness, self.harness_status)
        if HAS_SCORE:
            self.score_subscriber = rospy.Subscriber(
                "/srcsim/finals/score", Score, self.score_status)

        rate = rospy.Rate(10) # 10hz
        if self.task_subscriber.get_num_connections() == 0:
            self.oclog('waiting for task publisher...')
            while self.task_subscriber.get_num_connections() == 0:
                rate.sleep()
            if self.harness_subscriber.get_num_connections() == 0:
                self.oclog('waiting for harness publisher...')
                while self.harness_subscriber.get_num_connections() == 0:
                    rate.sleep()
            self.oclog('...got it, field computer is operational!')
find_cylinder_state.py 文件源码 项目:master_robot_strage 作者: nwpu-basketball-robot 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __init__(self):
        # self.cylinder_detector_client = rospy.ServiceProxy('cylinderdata',cylinder.cylinderdata)
        self.cylinder_opencv_client = rospy.ServiceProxy('cylinder_data',cylinder_opencv.cylinder_find)
        self.cylinder_laser_client = rospy.ServiceProxy('cylinder',cylinder_laser.cylinder)
        rospy.loginfo('[cylinder_state_pkg]->waiting cylinderdata service')
        self.cylinder_laser_client.wait_for_service()
        self.cylinder_opencv_client.wait_for_service()
        rospy.loginfo('[cylinder_state_pkg] -> connected to cylinder service')
    #?????????????????x?????????


问题


面经


文章

微信
公众号

扫码关注公众号