def register_service_raw(self, callback):
"""
Registers the service with a callback function.
:param callback: callback function to be executed when the service is called.
:return: service object.
"""
return rospy.Service(self.SERVICE_CHANNEL, self.service_class, callback)
python类Service()的实例源码
def robot_inference_server():
rospy.init_node('robot_inference_server')
s = rospy.Service('inference', Inference, handle)
print ("Ready to infer robot behavior.")
rospy.spin()
def init_traj(self, itr):
assert self.instance_type == 'main'
# request init service for auxiliary recorders
if self.use_aux:
try:
rospy.wait_for_service('init_traj', timeout=1)
resp1 = self.init_traj_func(itr, self.igrp)
except (rospy.ServiceException, rospy.ROSException), e:
rospy.logerr("Service call failed: %s" % (e,))
raise ValueError('get_kinectdata service failed')
self._init_traj_local(itr)
if ((itr+1) % self.ngroup) == 0:
self.igrp += 1
def delete_traj(self, tr):
assert self.instance_type == 'main'
if self.use_aux:
try:
rospy.wait_for_service('delete_traj', 0.1)
resp1 = self.delete_traj_func(tr, self.igrp)
except (rospy.ServiceException, rospy.ROSException), e:
rospy.logerr("Service call failed: %s" % (e,))
raise ValueError('delete traj service failed')
self._delete_traj_local(tr)
def get_aux_img(self):
try:
rospy.wait_for_service('get_kinectdata', 0.1)
resp1 = self.get_kinectdata_func()
self.ltob_aux1.img_msg = resp1.image
except (rospy.ServiceException, rospy.ROSException), e:
rospy.logerr("Service call failed: %s" % (e,))
raise ValueError('get_kinectdata service failed')
def run(self):
self.reset_srv = rospy.Service(self.reset_srv_name, Reset, self._cb_reset)
self.go_to_start()
if self.demo:
self.torso.start_idle_motion('head')
self.torso.start_idle_motion('right_arm')
rospy.spin()
if self.demo:
self.torso.stop_idle_motion('head')
self.torso.stop_idle_motion('right_arm')
def run(self):
for service in [self.service_name_set_compliant]:
rospy.loginfo("Perception is waiting service {}".format(service))
rospy.wait_for_service(service)
self.set_torso_compliant_srv = rospy.ServiceProxy(self.service_name_set_compliant, SetTorsoCompliant)
rospy.Service(self.service_name_get, GetSensorialState, self.cb_get)
rospy.Service(self.service_name_record, Record, self.cb_record)
rospy.loginfo("Done, perception is up!")
rospy.spin()
def __init__(self):
self.rospack = RosPack()
with open(join(self.rospack.get_path('apex_playground'), 'config', 'ergo.json')) as f:
self.params = json.load(f)
self.button = Button(self.params)
self.rate = rospy.Rate(self.params['publish_rate'])
self.button.switch_led(False)
# Service callers
self.robot_reach_srv_name = '{}/reach'.format(self.params['robot_name'])
self.robot_compliant_srv_name = '{}/set_compliant'.format(self.params['robot_name'])
rospy.loginfo("Ergo node is waiting for poppy controllers...")
rospy.wait_for_service(self.robot_reach_srv_name)
rospy.wait_for_service(self.robot_compliant_srv_name)
self.reach_proxy = rospy.ServiceProxy(self.robot_reach_srv_name, ReachTarget)
self.compliant_proxy = rospy.ServiceProxy(self.robot_compliant_srv_name, SetCompliant)
rospy.loginfo("Controllers connected!")
self.state_pub = rospy.Publisher('ergo/state', CircularState, queue_size=1)
self.button_pub = rospy.Publisher('sensors/buttons/help', Bool, queue_size=1)
self.goals = []
self.goal = 0.
self.joy_x = 0.
self.joy_y = 0.
self.motion_started_joy = 0.
self.js = JointState()
rospy.Subscriber('sensors/joystick/{}'.format(self.params["control_joystick_id"]), Joy, self.cb_joy)
rospy.Subscriber('{}/joint_state'.format(self.params['robot_name']), JointState, self.cb_js)
rospy.Subscriber('sensors/button_leds/pause', Bool, self.cb_bt_led)
self.t = rospy.Time.now()
self.srv_reset = None
self.extended = False
self.standby = False
self.last_activity = rospy.Time.now()
self.delta_t = rospy.Time.now()
def run_within_ros(self):
rospy.logwarn('Work Manager is using ROS to distribute work')
# Use ROS for work manager <-> controller comm
rospy.Service('work/get', GetWork, lambda req: GetWorkResponse(**self._cb_get_work(req.worker)))
rospy.Service('work/update', UpdateWorkStatus, lambda req: UpdateWorkStatusResponse(**self._cb_update_work(req.task, req.trial, req.worker, req.iteration, req.success)))
rospy.Service('work/add', AddWork, self._cb_add_work)
rospy.spin()
def run(self):
rospy.Service(self.service_name_perceive, Perceive, self.cb_perceive)
rospy.Service(self.service_name_produce, Produce, self.cb_produce)
rospy.Service(self.service_name_set_interest, SetFocus, self.cb_set_focus)
rospy.Service(self.service_name_set_iteration, SetIteration, self.cb_set_iteration)
rospy.Service(self.service_name_interests, GetInterests, self.cb_get_interests)
rospy.loginfo("Learning is up!")
rate = rospy.Rate(self.params['publish_rate'])
while not rospy.is_shutdown():
self.publish()
rate.sleep()
def publish(self):
if self.learning is not None:
with self.lock_iteration:
focus = copy(self.focus)
ready = copy(self.ready_for_interaction)
self.pub_ready.publish(Bool(data=ready))
self.pub_user_focus.publish(String(data=focus if focus is not None else ""))
self.pub_focus.publish(String(data=self.learning.get_last_focus()))
self.pub_iteration.publish(UInt32(data=self.learning.get_iterations()))
################################# Service callbacks
def run(self):
rospy.Service('recorder/record', RecordScene, self.cb_record)
rospy.spin()
cv2.destroyAllWindows()
def get_mission_by_id(req):
""" Service to update mission information with specific mission as given
by id.
Args:
req: GetMissionByID type request with field id corresponding to the
mission
Returns:
GetMissionByIdResponse which is true for success and false for
failure.
"""
with lock:
global msgs
try:
msgs = client.get_mission(req.id, frame)
except (ConnectionError, Timeout) as e:
rospy.logwarn(e)
return False, str(e)
except (ValueError, HTTPError) as e:
rospy.logerr(e)
return False, str(e)
except Exception as e:
rospy.logfatal(e)
return False, str(e)
rospy.loginfo("Using mission ID: %d", req.id)
return True, "Success"
def __init__(self):
super(Organizer, self).__init__()
rospy.Service("organizer/topic", AdvertiseUDP, self.__advertise_callback)
#self.udptopics_pub = rospy.Publisher("organizer/udptopics", TopicInfoArray, queue_size=1)
#self.topics.topics = []
def __init__(self, listenInt, interface, freq, essid, psswd, ip, nm, discoverOnce=True):
self.robotName = os.getenv('HOSTNAME')
self.tolerance = 20
self.x = 0
self.y = 0
self.client = WiFiTrilatClient()
self.discoverOnce = discoverOnce
rospy.init_node(self.robotName + "_wifitrilat_server")
#self.rssiPub = rospy.Publisher('/' + self.robotName + '/WiFiRSSI', WiFiRSSIMsg, queue_size=10)
self.service = rospy.Service("/" + self.robotName + "/WiFiTrilat", WiFiTrilat, self.handle_Trilat)
self.listenInt = listenInt
self.interface = interface
#self.mac = mac.lower()
self.freq = int(freq)
self.msgs = []
cli.execute_shell("ifconfig %s down" % self.listenInt)
#self.wifi = Wireless(self.interface).setFrequency("%.3f" % (float(self.freq) / 1000))
self.connectToNet(essid, psswd,ip, nm)
cli.execute_shell("ifconfig %s up" % self.listenInt)
self.purge = rospy.Timer(rospy.Duration(2), self.msgPurge)
self.heartbeat = rospy.Timer(rospy.Duration(1), self.heartbeat_call)
self.discoverTimer = rospy.Timer(rospy.Duration(20), self.findSelfPos)
sniff(iface=self.listenInt, prn=self.handler, store=0)
rospy.spin()
def __init__(self,side,ik=True):
limb.Limb.__init__(self,side)
self.side=side
self.DEFAULT_BAXTER_SPEED=0.3
if not side in ["left","right"]:
raise BaxterException,"Error non existing side: %s, please provide left or right"%side
self.post=Post(self)
self.__stop = False
self.simple=self
self._moving=False
self.moving_lock=Lock()
self.ik=ik
if self.ik:
self.ns = "/ExternalTools/%s/PositionKinematicsNode/IKService"%self.side
rospy.loginfo("Waiting for inverse kinematics service on %s..."%self.ns)
rospy.wait_for_service(self.ns)
self.iksvc = rospy.ServiceProxy(self.ns, SolvePositionIK)
rospy.loginfo("Waiting for inverse kinematics service DONE")
else:
rospy.loginfo("Skipping inverse kinematics service loading")
#self.simple_limb_srv = rospy.Service("/simple_limb/"+side,LimbPose,self.__cbLimbPose)
self._pub_speed=rospy.Publisher("/robot/limb/%s/set_speed_ratio"%self.side,Float64,queue_size=1)
while not rospy.is_shutdown() and self._pub_speed.get_num_connections() < 1:
rospy.sleep(0.1)
#self.setSpeed(3)
# def __cbLimbPose(self,msg):
# cmd = msg.command
# if cmd == "goToPose":
# resp = self.goToPose(msg.pose, msg.speed, msg.position_tolerance, msg.orientation_tolerance, msg.joint_tolerance, msg.speed_tolerance, msg.timeout, cartesian, msg.path_tolerance)
# elif cmd=="goToAngles":
# resp = self.goToAngles(msg.angles, msg.speed, msg.joint_tolerance_plan, msg.joint_tolerance, msg.speed_tolerance, msg.timeout, msg.path_tolerance)
# return LimbPoseResponse(resp)
def getAnglesFromPose(self,pose):
if type(pose)==Pose:
goal=PoseStamped()
goal.header.frame_id="/base"
goal.pose=pose
else:
goal=pose
if not self.ik:
rospy.logerror("ERROR: Inverse Kinematics service was not loaded")
return None
goalstr="%f,%f,%f"%(goal.pose.position.x,goal.pose.position.y,goal.pose.position.z)
ikreq = SolvePositionIKRequest()
ikreq.pose_stamp.append(goal)
try:
resp = self.iksvc(ikreq)
if (resp.isValid[0]):
return resp.joints[0]
else:
rospy.logerr("FAILURE - No Valid Joint Solution Found for %s"%goalstr)
return None
except rospy.ServiceException,e :
rospy.loginfo("Service call failed: %s" % (e,))
return None
__init__.py 文件源码
项目:lidapy-framework
作者: CognitiveComputingResearchGroup
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def get_service(self, service_name, service_class, callback):
return rospy.Service(service_name, service_class, callback)
def create_services(self):
print("Starting services...")
# Camera control services
self._services.append(rospy.Service('get_camera_view', services.GetCameraImage, self.handle_get_camera_image))
self._services.append(rospy.Service('get_camera_view_with_filename', services.GetCameraImageWithFilename,
self.handle_get_camera_image))
self._services.append(rospy.Service('get_camera_location', services.GetCameraLocation,
self.handle_get_camera_location))
self._services.append(rospy.Service('get_camera_rotation', services.GetCameraRotation,
self.handle_get_camera_rotation))
self._services.append(rospy.Service('get_viewmode', services.GetViewmode, self.handle_get_viewmode))
self._services.append(rospy.Service('move_camera', services.MoveCamera, self.handle_move_camera))
self._services.append(rospy.Service('set_camera_location', services.SetCameraLocation,
self.handle_set_camera_location))
self._services.append(rospy.Service('set_camera_rotation', services.SetCameraRotation,
self.handle_set_camera_rotation))
self._services.append(rospy.Service('set_viewmode', services.SetViewmode, self.handle_set_viewmode))
# object control services
self._services.append(rospy.Service('get_object_color', services.GetObjectColor, self.handle_get_object_color))
self._services.append(rospy.Service('set_object_color', services.SetObjectColor, self.handle_set_object_color))
self._services.append(rospy.Service('get_object_location', services.GetObjectLocation,
self.handle_get_object_location))
self._services.append(rospy.Service('get_object_rotation', services.GetObjectRotation,
self.handle_get_object_rotation))
self._services.append(rospy.Service('set_object_location', services.SetObjectLocation,
self.handle_set_object_location))
self._services.append(rospy.Service('set_object_rotation', services.SetObjectRotation,
self.handle_set_object_rotation))
def get_camera_image(self, camera_id, location, rotation):
roll, pitch, yaw = rotation
self._client_lock.acquire()
self._client.request("vset /camera/{0}/location {1} {2} {3}".format(camera_id, location[0],
location[1], location[2]))
self._client.request("vset /camera/{0}/rotation {1} {2} {3}".format(camera_id, pitch, yaw, roll))
image_filename = self._client.request("vget /camera/{0}/lit".format(camera_id))
self._client_lock.release()
return image_filename
# Service Handlers