python类Service()的实例源码

ros_services.py 文件源码 项目:aide 作者: Lambda-3 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def register_service_raw(self, callback):
        """
        Registers the service with a callback function.

        :param callback: callback function to be executed when the service is called. 
        :return: service object.
        """

        return rospy.Service(self.SERVICE_CHANNEL, self.service_class, callback)
robot_inference_server.py 文件源码 项目:ros-robot-inference 作者: anchen1011 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def robot_inference_server():
    rospy.init_node('robot_inference_server')
    s = rospy.Service('inference', Inference, handle)
    print ("Ready to infer robot behavior.")
    rospy.spin()
robot_recorder.py 文件源码 项目:visual_mpc 作者: febert 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def init_traj(self, itr):
        assert self.instance_type == 'main'
        # request init service for auxiliary recorders
        if self.use_aux:
            try:
                rospy.wait_for_service('init_traj', timeout=1)
                resp1 = self.init_traj_func(itr, self.igrp)
            except (rospy.ServiceException, rospy.ROSException), e:
                rospy.logerr("Service call failed: %s" % (e,))
                raise ValueError('get_kinectdata service failed')

        self._init_traj_local(itr)

        if ((itr+1) % self.ngroup) == 0:
            self.igrp += 1
robot_recorder.py 文件源码 项目:visual_mpc 作者: febert 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def delete_traj(self, tr):
        assert self.instance_type == 'main'
        if self.use_aux:
            try:
                rospy.wait_for_service('delete_traj', 0.1)
                resp1 = self.delete_traj_func(tr, self.igrp)
            except (rospy.ServiceException, rospy.ROSException), e:
                rospy.logerr("Service call failed: %s" % (e,))
                raise ValueError('delete traj service failed')
        self._delete_traj_local(tr)
robot_recorder.py 文件源码 项目:visual_mpc 作者: febert 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_aux_img(self):
        try:
            rospy.wait_for_service('get_kinectdata', 0.1)
            resp1 = self.get_kinectdata_func()
            self.ltob_aux1.img_msg = resp1.image
        except (rospy.ServiceException, rospy.ROSException), e:
            rospy.logerr("Service call failed: %s" % (e,))
            raise ValueError('get_kinectdata service failed')
torso.py 文件源码 项目:APEX 作者: ymollard 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def run(self):
        self.reset_srv = rospy.Service(self.reset_srv_name, Reset, self._cb_reset)
        self.go_to_start()
        if self.demo:
            self.torso.start_idle_motion('head')
            self.torso.start_idle_motion('right_arm')

        rospy.spin()

        if self.demo:
            self.torso.stop_idle_motion('head')
            self.torso.stop_idle_motion('right_arm')
perception.py 文件源码 项目:APEX 作者: ymollard 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def run(self):
        for service in [self.service_name_set_compliant]:
            rospy.loginfo("Perception is waiting service {}".format(service))
            rospy.wait_for_service(service)
        self.set_torso_compliant_srv = rospy.ServiceProxy(self.service_name_set_compliant, SetTorsoCompliant)
        rospy.Service(self.service_name_get, GetSensorialState, self.cb_get)
        rospy.Service(self.service_name_record, Record, self.cb_record)
        rospy.loginfo("Done, perception is up!")
        rospy.spin()
ergo.py 文件源码 项目:APEX 作者: ymollard 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def __init__(self):
        self.rospack = RosPack()
        with open(join(self.rospack.get_path('apex_playground'), 'config', 'ergo.json')) as f:
            self.params = json.load(f)
        self.button = Button(self.params)
        self.rate = rospy.Rate(self.params['publish_rate'])
        self.button.switch_led(False)

        # Service callers
        self.robot_reach_srv_name = '{}/reach'.format(self.params['robot_name'])
        self.robot_compliant_srv_name = '{}/set_compliant'.format(self.params['robot_name'])
        rospy.loginfo("Ergo node is waiting for poppy controllers...")
        rospy.wait_for_service(self.robot_reach_srv_name)
        rospy.wait_for_service(self.robot_compliant_srv_name)
        self.reach_proxy = rospy.ServiceProxy(self.robot_reach_srv_name, ReachTarget)
        self.compliant_proxy = rospy.ServiceProxy(self.robot_compliant_srv_name, SetCompliant)
        rospy.loginfo("Controllers connected!")

        self.state_pub = rospy.Publisher('ergo/state', CircularState, queue_size=1)
        self.button_pub = rospy.Publisher('sensors/buttons/help', Bool, queue_size=1)

        self.goals = []
        self.goal = 0.
        self.joy_x = 0.
        self.joy_y = 0.
        self.motion_started_joy = 0.
        self.js = JointState()
        rospy.Subscriber('sensors/joystick/{}'.format(self.params["control_joystick_id"]), Joy, self.cb_joy)
        rospy.Subscriber('{}/joint_state'.format(self.params['robot_name']), JointState, self.cb_js)
        rospy.Subscriber('sensors/button_leds/pause', Bool, self.cb_bt_led)

        self.t = rospy.Time.now()
        self.srv_reset = None
        self.extended = False
        self.standby = False
        self.last_activity = rospy.Time.now()
        self.delta_t = rospy.Time.now()
work_manager.py 文件源码 项目:APEX 作者: ymollard 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def run_within_ros(self):
        rospy.logwarn('Work Manager is using ROS to distribute work')

        # Use ROS for work manager <-> controller comm
        rospy.Service('work/get', GetWork, lambda req: GetWorkResponse(**self._cb_get_work(req.worker)))
        rospy.Service('work/update', UpdateWorkStatus, lambda req: UpdateWorkStatusResponse(**self._cb_update_work(req.task, req.trial, req.worker, req.iteration, req.success)))
        rospy.Service('work/add', AddWork, self._cb_add_work)

        rospy.spin()
learning.py 文件源码 项目:APEX 作者: ymollard 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def run(self):
        rospy.Service(self.service_name_perceive, Perceive, self.cb_perceive)
        rospy.Service(self.service_name_produce, Produce, self.cb_produce)
        rospy.Service(self.service_name_set_interest, SetFocus, self.cb_set_focus)
        rospy.Service(self.service_name_set_iteration, SetIteration, self.cb_set_iteration)
        rospy.Service(self.service_name_interests, GetInterests, self.cb_get_interests)
        rospy.loginfo("Learning is up!")

        rate = rospy.Rate(self.params['publish_rate'])
        while not rospy.is_shutdown():
            self.publish()
            rate.sleep()
learning.py 文件源码 项目:APEX 作者: ymollard 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def publish(self):
        if self.learning is not None:
            with self.lock_iteration:
                focus = copy(self.focus)
                ready = copy(self.ready_for_interaction)

            self.pub_ready.publish(Bool(data=ready))
            self.pub_user_focus.publish(String(data=focus if focus is not None else ""))
            self.pub_focus.publish(String(data=self.learning.get_last_focus()))
            self.pub_iteration.publish(UInt32(data=self.learning.get_iterations()))


    ################################# Service callbacks
recorder.py 文件源码 项目:APEX 作者: ymollard 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def run(self):
        rospy.Service('recorder/record', RecordScene, self.cb_record)
        rospy.spin()
        cv2.destroyAllWindows()
missions_client.py 文件源码 项目:ros-interop 作者: mcgill-robotics 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_mission_by_id(req):
    """ Service to update mission information with specific mission as given
    by id.

    Args:
        req: GetMissionByID type request with field id corresponding to the
             mission

    Returns:
        GetMissionByIdResponse which is true for success and false for
        failure.
    """
    with lock:
        global msgs
        try:
            msgs = client.get_mission(req.id, frame)
        except (ConnectionError, Timeout) as e:
            rospy.logwarn(e)
            return False, str(e)
        except (ValueError, HTTPError) as e:
            rospy.logerr(e)
            return False, str(e)
        except Exception as e:
            rospy.logfatal(e)
            return False, str(e)

    rospy.loginfo("Using mission ID: %d", req.id)
    return True, "Success"
organizer.py 文件源码 项目:multimaster_udp 作者: AlexisTM 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __init__(self):
        super(Organizer, self).__init__()
        rospy.Service("organizer/topic", AdvertiseUDP, self.__advertise_callback)
        #self.udptopics_pub = rospy.Publisher("organizer/udptopics", TopicInfoArray, queue_size=1)
        #self.topics.topics = []
WiFiTrilatSrv.py 文件源码 项目:SwarmRobotics 作者: superit23 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self, listenInt, interface, freq, essid, psswd, ip, nm, discoverOnce=True):
    self.robotName = os.getenv('HOSTNAME')
    self.tolerance = 20
    self.x = 0
    self.y = 0
    self.client = WiFiTrilatClient()

    self.discoverOnce = discoverOnce

    rospy.init_node(self.robotName + "_wifitrilat_server")

    #self.rssiPub = rospy.Publisher('/' + self.robotName + '/WiFiRSSI', WiFiRSSIMsg, queue_size=10)
    self.service = rospy.Service("/" + self.robotName + "/WiFiTrilat", WiFiTrilat, self.handle_Trilat)

    self.listenInt = listenInt
    self.interface = interface
    #self.mac = mac.lower()
    self.freq = int(freq)

    self.msgs = []

    cli.execute_shell("ifconfig %s down" % self.listenInt)
    #self.wifi = Wireless(self.interface).setFrequency("%.3f" % (float(self.freq) / 1000))
    self.connectToNet(essid, psswd,ip, nm)
    cli.execute_shell("ifconfig %s up" % self.listenInt)

    self.purge = rospy.Timer(rospy.Duration(2), self.msgPurge)
    self.heartbeat = rospy.Timer(rospy.Duration(1), self.heartbeat_call)
    self.discoverTimer = rospy.Timer(rospy.Duration(20), self.findSelfPos)


    sniff(iface=self.listenInt, prn=self.handler, store=0)
    rospy.spin()
simple_limb.py 文件源码 项目:baxter_gui 作者: HumaRobotics 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def __init__(self,side,ik=True):
        limb.Limb.__init__(self,side)
        self.side=side

        self.DEFAULT_BAXTER_SPEED=0.3

        if not side in ["left","right"]:
            raise BaxterException,"Error non existing side: %s, please provide left or right"%side

        self.post=Post(self)
        self.__stop = False
        self.simple=self
        self._moving=False
        self.moving_lock=Lock()

        self.ik=ik        
        if self.ik:            
            self.ns = "/ExternalTools/%s/PositionKinematicsNode/IKService"%self.side    
            rospy.loginfo("Waiting for inverse kinematics service on %s..."%self.ns)
            rospy.wait_for_service(self.ns)
            self.iksvc = rospy.ServiceProxy(self.ns, SolvePositionIK)
            rospy.loginfo("Waiting for inverse kinematics service DONE")
        else:
            rospy.loginfo("Skipping inverse kinematics service loading")

        #self.simple_limb_srv = rospy.Service("/simple_limb/"+side,LimbPose,self.__cbLimbPose)

        self._pub_speed=rospy.Publisher("/robot/limb/%s/set_speed_ratio"%self.side,Float64,queue_size=1)
        while not rospy.is_shutdown() and self._pub_speed.get_num_connections() < 1:
            rospy.sleep(0.1)
        #self.setSpeed(3)


#     def __cbLimbPose(self,msg):
#         cmd = msg.command
#         if cmd == "goToPose":
#             resp = self.goToPose(msg.pose, msg.speed, msg.position_tolerance, msg.orientation_tolerance, msg.joint_tolerance, msg.speed_tolerance, msg.timeout, cartesian, msg.path_tolerance)
#         elif cmd=="goToAngles":
#             resp = self.goToAngles(msg.angles, msg.speed, msg.joint_tolerance_plan, msg.joint_tolerance, msg.speed_tolerance, msg.timeout, msg.path_tolerance)
#         return LimbPoseResponse(resp)
simple_limb.py 文件源码 项目:baxter_gui 作者: HumaRobotics 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def getAnglesFromPose(self,pose):
        if type(pose)==Pose:
            goal=PoseStamped()
            goal.header.frame_id="/base"
            goal.pose=pose
        else:
            goal=pose


        if not self.ik:
            rospy.logerror("ERROR: Inverse Kinematics service was not loaded")
            return None
        goalstr="%f,%f,%f"%(goal.pose.position.x,goal.pose.position.y,goal.pose.position.z)
        ikreq = SolvePositionIKRequest()

        ikreq.pose_stamp.append(goal)
        try:
            resp = self.iksvc(ikreq)
            if (resp.isValid[0]):
                return resp.joints[0]
            else:
                rospy.logerr("FAILURE - No Valid Joint Solution Found for %s"%goalstr)
                return None
        except rospy.ServiceException,e :
            rospy.loginfo("Service call failed: %s" % (e,))
            return None
__init__.py 文件源码 项目:lidapy-framework 作者: CognitiveComputingResearchGroup 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_service(self, service_name, service_class, callback):
        return rospy.Service(service_name, service_class, callback)
passthrough_node.py 文件源码 项目:unrealcv-ros 作者: jskinn 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def create_services(self):
        print("Starting services...")
        # Camera control services
        self._services.append(rospy.Service('get_camera_view', services.GetCameraImage, self.handle_get_camera_image))
        self._services.append(rospy.Service('get_camera_view_with_filename', services.GetCameraImageWithFilename,
                                            self.handle_get_camera_image))
        self._services.append(rospy.Service('get_camera_location', services.GetCameraLocation,
                                            self.handle_get_camera_location))
        self._services.append(rospy.Service('get_camera_rotation', services.GetCameraRotation,
                                            self.handle_get_camera_rotation))
        self._services.append(rospy.Service('get_viewmode', services.GetViewmode, self.handle_get_viewmode))
        self._services.append(rospy.Service('move_camera', services.MoveCamera, self.handle_move_camera))
        self._services.append(rospy.Service('set_camera_location', services.SetCameraLocation,
                                            self.handle_set_camera_location))
        self._services.append(rospy.Service('set_camera_rotation', services.SetCameraRotation,
                                            self.handle_set_camera_rotation))
        self._services.append(rospy.Service('set_viewmode', services.SetViewmode, self.handle_set_viewmode))

        # object control services
        self._services.append(rospy.Service('get_object_color', services.GetObjectColor, self.handle_get_object_color))
        self._services.append(rospy.Service('set_object_color', services.SetObjectColor, self.handle_set_object_color))
        self._services.append(rospy.Service('get_object_location', services.GetObjectLocation,
                                            self.handle_get_object_location))
        self._services.append(rospy.Service('get_object_rotation', services.GetObjectRotation,
                                            self.handle_get_object_rotation))
        self._services.append(rospy.Service('set_object_location', services.SetObjectLocation,
                                            self.handle_set_object_location))
        self._services.append(rospy.Service('set_object_rotation', services.SetObjectRotation,
                                            self.handle_set_object_rotation))
passthrough_node.py 文件源码 项目:unrealcv-ros 作者: jskinn 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def get_camera_image(self, camera_id, location, rotation):
        roll, pitch, yaw = rotation
        self._client_lock.acquire()
        self._client.request("vset /camera/{0}/location {1} {2} {3}".format(camera_id, location[0],
                                                                            location[1], location[2]))
        self._client.request("vset /camera/{0}/rotation {1} {2} {3}".format(camera_id, pitch, yaw, roll))
        image_filename = self._client.request("vget /camera/{0}/lit".format(camera_id))
        self._client_lock.release()
        return image_filename

    # Service Handlers


问题


面经


文章

微信
公众号

扫码关注公众号