def ik_solve(limb, pos, orient):
ns = "ExternalTools/" + limb + "/PositionKinematicsNode/IKService"
iksvc = rospy.ServiceProxy(ns, SolvePositionIK)
ikreq = SolvePositionIKRequest()
hdr = Header(stamp=rospy.Time.now(), frame_id='base')
poses = {
str(limb): PoseStamped(header=hdr, pose=Pose(position=pos, orientation=orient))}
ikreq.pose_stamp.append(poses[limb])
try:
rospy.wait_for_service(ns, 5.0)
resp = iksvc(ikreq)
except (rospy.ServiceException, rospy.ROSException), e:
rospy.logerr("Service call failed: %s" % (e,))
return 1
if resp.isValid[0]:
limb_joints = dict(zip(resp.joints[0].name, resp.joints[0].position))
return limb_joints
else:
return Errors.RaiseNotReachable(pos)
python类ServiceProxy()的实例源码
def __init__(self):
rospy.init_node('multiplexer_node', anonymous=False)
rospy.loginfo('\033[93m[%s]\033[0m wait for bringup social_events_memory...'%rospy.get_name())
rospy.wait_for_service('social_events_memory/read_data')
self.rd_event_mem = rospy.ServiceProxy('social_events_memory/read_data', ReadData)
rospy.Subscriber('forwarding_event', ForwardingEvent, self.handle_social_events)
self.pub_rasing_event = rospy.Publisher('raising_events', RaisingEvents, queue_size=10)
self.events_queue = Queue.Queue()
self.recognized_words_queue = Queue.Queue()
event_period = rospy.get_param('~event_period', 0.5)
rospy.Timer(rospy.Duration(event_period), self.handle_trigger_events)
rospy.loginfo('\033[93m[%s]\033[0m initialized...'%rospy.get_name())
rospy.spin()
def __init__(self, parent_window):
# Service Names.
self.service_names = self.get_service_names()
self.main_label = Label(parent_window, text="Reset Rovio (NPOSE = 0)", font="Times 14 bold")
self.main_label.grid(row=0, columnspan=2)
# Init rovio button.
current_row = 1
self.init_rovio_button = Button(parent_window,
text = 'Reset Rovio',
command = self.init_rovio_button_handler)
self.init_rovio_button.grid(row = current_row, columnspan = 2)
# Init rovio service message.
current_row = 1
self.init_message_label = Label(parent_window, text ='', wraplength = 450)
self.init_message_label.grid(row = current_row, columnspan = 2)
# Init rovio service client.
self.init_rovio_srv = rospy.ServiceProxy(self.service_names['init_rovio'],
std_srvs.srv.Trigger)
def __init__(self):
self.eye_on_hand = rospy.get_param('eye_on_hand', False)
# tf names
self.robot_base_frame = rospy.get_param('robot_base_frame', 'base_link')
self.robot_effector_frame = rospy.get_param('robot_effector_frame', 'tool0')
self.tracking_base_frame = rospy.get_param('tracking_base_frame', 'optical_origin')
self.tracking_marker_frame = rospy.get_param('tracking_marker_frame', 'optical_target')
rospy.wait_for_service(hec.GET_SAMPLE_LIST_TOPIC)
self.get_sample_proxy = rospy.ServiceProxy(hec.GET_SAMPLE_LIST_TOPIC, hec.srv.TakeSample)
rospy.wait_for_service(hec.TAKE_SAMPLE_TOPIC)
self.take_sample_proxy = rospy.ServiceProxy(hec.TAKE_SAMPLE_TOPIC, hec.srv.TakeSample)
rospy.wait_for_service(hec.REMOVE_SAMPLE_TOPIC)
self.remove_sample_proxy = rospy.ServiceProxy(hec.REMOVE_SAMPLE_TOPIC, hec.srv.RemoveSample)
rospy.wait_for_service(hec.COMPUTE_CALIBRATION_TOPIC)
self.compute_calibration_proxy = rospy.ServiceProxy(hec.COMPUTE_CALIBRATION_TOPIC, hec.srv.ComputeCalibration)
rospy.wait_for_service(hec.SAVE_CALIBRATION_TOPIC)
self.save_calibration_proxy = rospy.ServiceProxy(hec.SAVE_CALIBRATION_TOPIC, std_srvs.srv.Empty)
def robot_listener(self):
'''
rospy.wait_for_service('spawn')
spawner = rospy.ServiceProxy('spawn', turtlesim.srv.Spawn)
spawner(4, 2, 0, 'turtle2')
'''
robot_vel_pub = rospy.Publisher('robot_0/cmd_vel', geometry_msgs.msg.Twist, queue_size=1)
rate = rospy.Rate(10.0)
while not rospy.is_shutdown():
try:
(trans,rot) = self.listener.lookupTransform('/robot_3', '/robot_0', rospy.Time(0))
except (tf.LookupException, tf.ConnectivityException, tf.ExtrapolationException):
continue
angular = 4 * math.atan2(trans[1], trans[0])
linear = 0.5 * math.sqrt(trans[0] ** 2 + trans[1] ** 2)
cmd = geometry_msgs.msg.Twist()
cmd.linear.x = linear
cmd.angular.z = angular
robot_vel_pub.publish(cmd)
rate.sleep()
def __init__(self):
rospy.init_node('gaze', anonymous=False)
self.lock = threading.RLock()
with self.lock:
self.current_state = GazeState.IDLE
self.last_state = self.current_state
# Initialize Variables
self.glance_timeout = 0
self.glance_timecount = 0
self.glance_played = False
self.idle_timeout = 0
self.idle_timecount = 0
self.idle_played = False
rospy.loginfo('\033[92m[%s]\033[0m waiting for bringup social_mind...'%rospy.get_name())
rospy.wait_for_service('environmental_memory/read_data')
rospy.wait_for_service('social_events_memory/read_data')
self.rd_memory = {}
self.rd_memory['environmental_memory'] = rospy.ServiceProxy('environmental_memory/read_data', ReadData)
self.rd_memory['social_events_memory'] = rospy.ServiceProxy('social_events_memory/read_data', ReadData)
rospy.Subscriber('raising_events', RaisingEvents, self.handle_raising_events)
rospy.Subscriber('gaze_focusing', String, self.handle_gaze_focusing)
self.pub_gaze_cmd = rospy.Publisher('gaze_command', GazeCommand, queue_size=10)
self.pub_viz_gaze_cmd = rospy.Publisher('visualization_gaze_cmd', PointStamped, queue_size=10)
rospy.Timer(rospy.Duration(GAZE_CONTROLLER_PERIOD), self.handle_gaze_controller)
rospy.loginfo('\033[92m[%s]\033[0m initialized...'%rospy.get_name())
rospy.spin()
def spawn_model(model_name):
""" Spawns a model in front of the RGBD camera.
Args: None
Returns: None
"""
initial_pose = Pose()
initial_pose.position.x = 0
initial_pose.position.y = 1
initial_pose.position.z = 1
# Spawn the new model #
model_path = rospkg.RosPack().get_path('sensor_stick')+'/models/'
model_xml = ''
with open (model_path + model_name + '/model.sdf', 'r') as xml_file:
model_xml = xml_file.read().replace('\n', '')
spawn_model_prox = rospy.ServiceProxy('gazebo/spawn_sdf_model', SpawnModel)
spawn_model_prox('training_model', model_xml, '', initial_pose, 'world')
def __init__(self, id, initialPosition, tf):
self.id = id
prefix = "/cf" + str(id)
self.initialPosition = np.array(initialPosition)
rospy.wait_for_service(prefix + "/upload_trajectory");
self.uploadTrajectoryService = rospy.ServiceProxy(prefix + "/upload_trajectory", UploadTrajectory)
rospy.wait_for_service(prefix + "/set_ellipse");
self.setEllipseService = rospy.ServiceProxy(prefix + "/set_ellipse", SetEllipse)
rospy.wait_for_service(prefix + "/takeoff")
self.takeoffService = rospy.ServiceProxy(prefix + "/takeoff", Takeoff)
rospy.wait_for_service(prefix + "/land")
self.landService = rospy.ServiceProxy(prefix + "/land", Land)
rospy.wait_for_service(prefix + "/hover")
self.hoverService = rospy.ServiceProxy(prefix + "/hover", Hover)
rospy.wait_for_service(prefix + "/avoid_target")
self.avoidTargetService = rospy.ServiceProxy(prefix + "/avoid_target", AvoidTarget)
rospy.wait_for_service(prefix + "/set_group")
self.setGroupService = rospy.ServiceProxy(prefix + "/set_group", SetGroup)
rospy.wait_for_service(prefix + "/update_params")
self.updateParamsService = rospy.ServiceProxy(prefix + "/update_params", UpdateParams)
self.tf = tf
self.prefix = prefix
def __init__(self):
# ROS initialization:
rospy.init_node('pose_manager')
self.poseLibrary = dict()
self.readInPoses()
self.poseServer = actionlib.SimpleActionServer("body_pose", BodyPoseAction,
execute_cb=self.executeBodyPose,
auto_start=False)
self.trajectoryClient = actionlib.SimpleActionClient("joint_trajectory", JointTrajectoryAction)
if self.trajectoryClient.wait_for_server(rospy.Duration(3.0)):
try:
rospy.wait_for_service("stop_walk_srv", timeout=2.0)
self.stopWalkSrv = rospy.ServiceProxy("stop_walk_srv", Empty)
except:
rospy.logwarn("stop_walk_srv not available, pose_manager will not stop the walker before executing a trajectory. "
+"This is normal if there is no nao_walker running.")
self.stopWalkSrv = None
self.poseServer.start()
rospy.loginfo("pose_manager running, offering poses: %s", self.poseLibrary.keys());
else:
rospy.logfatal("Could not connect to required \"joint_trajectory\" action server, is the nao_controller node running?");
rospy.signal_shutdown("Required component missing");
def test_timeout(self):
entered = MagicMock()
exited = MagicMock()
self.my_task.steps['timed'].entered_handler = entered
self.my_task.steps['timed'].exited_handler = exited
rospy.wait_for_service("mytask_step")
proxy = rospy.ServiceProxy(
"mytask_step",
StepTask
)
response = proxy(name='timed')
self.assertEqual(self.my_task.current_step.name, 'timed')
entered.assert_called_once()
rospy.sleep(0.2)
self.assertEqual(self.my_task.current_step.name, 'say_hello')
exited.assert_called_once()
# Test that the timer does in fact reset
rospy.sleep(0.2)
self.assertEqual(self.my_task.current_step.name, 'say_hello')
def request(self, uavcan_msg, node_id, callback, priority=None, timeout=None):
#pylint: disable=W0613
uavcan_type = uavcan.get_uavcan_data_type(uavcan_msg)
if not uavcan_type.full_name in self.__service_proxies:
self.__service_proxies[uavcan_type.full_name] = Service(uavcan_type.full_name).ServiceProxy()
service_proxy = self.__service_proxies[uavcan_type.full_name]
ros_req = copy_uavcan_ros(service_proxy.request_class(), uavcan_msg, request=True)
setattr(ros_req, uavcan_id_field_name, node_id)
def service_proxy_call():
try:
return service_proxy(ros_req)
except rospy.ServiceException:
return None
def request_finished(fut):
ros_resp = fut.result()
if ros_resp is None:
uavcan_resp = None
else:
uavcan_resp = uavcan_node._Event(self, uavcan_type, ros_resp, uavcan_id=node_id, request=False)
callback(uavcan_resp)
self.__request_executor.submit(service_proxy_call).add_done_callback(request_finished)
gripperalignment.py 文件源码
项目:cu-perception-manipulation-stack
作者: correlllab
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def ik_quaternion(self, quaternion_pose):
node = ("ExternalTools/{}/PositionKinematicsNode/"
"IKService".format(self.limb_name))
ik_service = rospy.ServiceProxy(node, SolvePositionIK)
ik_request = SolvePositionIKRequest()
ik_request.pose_stamp.append(quaternion_pose)
try:
rospy.loginfo('ik: waiting for IK service...')
rospy.wait_for_service(node, 5.0)
ik_response = ik_service(ik_request)
except (rospy.ServiceException, rospy.ROSException) as err:
rospy.logerr("ik_move: service request failed: %r" % err)
else:
if ik_response.isValid[0]:
rospy.loginfo("ik_move: valid joint configuration found")
limb_joints = dict(zip(ik_response.joints[0].name,
ik_response.joints[0].position))
return limb_joints
else:
rospy.logerr('ik_move: no valid configuration found')
return None
def __init__(self):
self.services = {'record': {'name': 'perception/record', 'type': Record}}
for service_name, service in self.services.items():
rospy.loginfo("Controller is waiting service {}...".format(service['name']))
rospy.wait_for_service(service['name'])
service['call'] = rospy.ServiceProxy(service['name'], service['type'])
# Buttons switches + LEDs
self.prefix = 'sensors'
self.button_leds_topics = ['button_leds/help', 'button_leds/pause']
self.buttons_topics = ['buttons/help', 'buttons/pause']
self.subscribers = [rospy.Subscriber('/'.join([self.prefix, topic]), Bool, lambda msg: self._cb_button_pressed(msg, topic)) for topic in self.buttons_topics]
self.publishers = {topic: rospy.Publisher('/'.join([self.prefix, topic]), Bool, queue_size=1) for topic in self.button_leds_topics}
self.button_leds_status = {topic: False for topic in self.button_leds_topics}
self.button_pressed = {topic: False for topic in self.buttons_topics}
self.last_press = {topic: rospy.Time(0) for topic in self.buttons_topics}
self.rospack = RosPack()
with open(join(self.rospack.get_path('apex_playground'), 'config', 'perception.json')) as f:
self.params = json.load(f)
def __init__(self):
self.services = {'set_iteration': {'name': 'learning/set_iteration', 'type': SetIteration},
'set_focus': {'name': 'learning/set_interest', 'type': SetFocus},
'assess': {'name': 'controller/assess', 'type': Assess},
'get_interests': {'name': 'learning/get_interests', 'type': GetInterests}}
rospy.Subscriber('learning/current_focus', String, self._cb_focus)
rospy.Subscriber('learning/user_focus', String, self._cb_user_focus)
rospy.Subscriber('learning/ready_for_interaction', Bool, self._cb_ready)
self.current_focus = ""
self.user_focus = ""
self.ready_for_interaction = False
for service_name, service in self.services.items():
rospy.loginfo("User node is waiting service {}...".format(service['name']))
rospy.wait_for_service(service['name'])
service['call'] = rospy.ServiceProxy(service['name'], service['type'])
rospy.loginfo("User node started!")
def _arm(self):
print(self.namespace, 'arming')
service_name = '%s/mavros/cmd/arming' % self.namespace
rospy.wait_for_service(service_name)
try:
service = rospy.ServiceProxy(service_name, CommandBool)
resp = service(True)
except rospy.ServiceException as e:
print(self.namespace, 'service call to arm failed:', str(e),
file=sys.stderr)
return False
if not resp.success:
print(self.namespace, 'failed to arm', file=sys.stderr)
return False
print(self.namespace, 'armed')
return True
def _return_home(self):
print(self.namespace, 'land')
req = CommandTOLRequest()
req.min_pitch = 0.0
req.yaw = -math.pi if self.color == 'blue' else 0.0
req.latitude = self.start_position.latitude
req.longitude = self.start_position.longitude
req.altitude = self.start_position.altitude
service_name = '%s/mavros/cmd/land' % self.namespace
rospy.wait_for_service(service_name)
try:
service = rospy.ServiceProxy(service_name, CommandTOL)
resp = service.call(req)
except rospy.ServiceException as e:
print(self.namespace, 'service call to land failed:', str(e),
file=sys.stderr)
return False
if not resp.success:
print(self.namespace, 'failed to land', file=sys.stderr)
return False
print(self.namespace, 'landing')
self._set_state(STATE_LANDING)
return True
def delete_model(mav_sys_id, vehicle_type, ros_master_uri=None):
if ros_master_uri:
original_uri = os.environ[ROS_MASTER_URI]
os.environ[ROS_MASTER_URI] = ros_master_uri
srv = ServiceProxy('/gazebo/delete_model', DeleteModel)
req = DeleteModelRequest()
unique_name = vehicle_type + '_' + str(mav_sys_id)
req.model_name = unique_name
resp = srv(req)
if ros_master_uri:
os.environ[ROS_MASTER_URI] = original_uri
if resp.success:
print(resp.status_message, '(%s)' % unique_name)
return 0
else:
print("failed to delete model [%s]: %s" %
(unique_name, resp.status_message), file=sys.stderr)
return 1
def compute_fk_client(self, group, joint_values, links):
rospy.wait_for_service('compute_fk')
try:
compute_fk = rospy.ServiceProxy('compute_fk', GetPositionFK)
header = Header()
header.stamp = rospy.Time.now()
header.frame_id = group.get_pose_reference_frame()
rs = RobotState()
rs.joint_state.header = header
rs.joint_state.name = group.get_active_joints()
rs.joint_state.position = joint_values
res = compute_fk(header, links, rs)
return res.pose_stamped
except rospy.ServiceException, e:
print "Service call failed: %s" % e
def __init__(self, name):
"""
Constructor.
@param name: camera identifier. You can get a list of valid
identifiers by calling the ROS service /cameras/list.
Expected names are right_hand_camera, left_hand_camera
and head_camera. However if the cameras are not
identified via the parameter server, they are simply
indexed starting at 0.
"""
self._id = name
self._open_svc = rospy.ServiceProxy('/cameras/open', OpenCamera)
self._close_svc = rospy.ServiceProxy('/cameras/close', CloseCamera)
self._settings = CameraSettings()
self._settings.width = 320
self._settings.height = 200
self._settings.fps = 20
self._open = False
def __init__(self, topic_prefix, config_file, robot_config_file, write_lock, bag_file):
self.topic_prefix = topic_prefix
self.test_config = config_file
self.robot_config_file = robot_config_file
resources_timer_frequency = 100.0 # Hz
self.timer_interval = 1/resources_timer_frequency
self.res_pipeline = {}
self.BfW = BagfileWriter(bag_file, write_lock)
rospy.loginfo("Waiting for obstacle_distance node...")
rospy.wait_for_service(self.robot_config_file["obstacle_distance"]["services"])
self.obstacle_distance_server = rospy.ServiceProxy(self.robot_config_file["obstacle_distance"]["services"],
GetObstacleDistance)
rospy.Timer(rospy.Duration.from_sec(self.timer_interval), self.collect_obstacle_distances)
def __init__(self, server):
self.server = server
self.mutex = Lock()
# Publisher to send commands
self.pub_command = rospy.Publisher("/motion_planning_goal", geometry_msgs.msg.Transform,
queue_size=1)
self.listener = tf.TransformListener()
# Subscribes to information about what the current joint values are.
rospy.Subscriber("/joint_states", sensor_msgs.msg.JointState,
self.joint_states_callback)
# Publisher to set robot position
self.pub_reset = rospy.Publisher("/joint_command", JointState, queue_size=1)
rospy.sleep(0.5)
# Wait for validity check service
rospy.wait_for_service("check_state_validity")
self.state_valid_service = rospy.ServiceProxy('check_state_validity',
moveit_msgs.srv.GetStateValidity)
self.reset_robot()
def main():
trans= 0
rot = 0
rospy.init_node('odom_sync')
listener = tf.TransformListener()
pubfiltered = rospy.Publisher('set_pose',PoseWithCovarianceStamped,queue_size = 10)
pubodom = rospy.Publisher('mobile_base/commands/reset_odometry', Empty, queue_size = 10)
serv = rospy.ServiceProxy("set_offset",SetOdomOffset)
rospy.Subscriber('/command_input',UInt8,callback,callback_args=(pubodom,pubfiltered,listener,trans,rot,serv))
rospy.sleep(1)
print "done sleeping"
while not rospy.is_shutdown():
try:
(trans,rot) = listener.lookupTransform('/base_meas', '/odom_meas',rospy.Time(0))
except: continue
rospy.spin()
def publish_to_ros(self, mode='transform', service_name='rigid_transforms/rigid_transform_publisher', namespace=None):
"""Publishes RigidTransform to ROS
If a transform referencing the same frames already exists in the ROS publisher, it is updated instead.
This checking is not order sensitive
Requires ROS rigid_transform_publisher service to be running. Assuming autolab_core is installed as a catkin package,
this can be done with: roslaunch autolab_core rigid_transforms.launch
Parameters
----------
mode : :obj:`str`
Mode in which to publish. In {'transform', 'frame'}
Defaults to 'transform'
service_name : string, optional
RigidTransformPublisher service to interface with. If the RigidTransformPublisher services are started through
rigid_transforms.launch it will be called rigid_transform_publisher
namespace : string, optional
Namespace to prepend to transform_listener_service. If None, current namespace is prepended.
Raises
------
rospy.ServiceException
If service call to rigid_transform_publisher fails
"""
if namespace == None:
service_name = rospy.get_namespace() + service_name
else:
service_name = namespace + service_name
rospy.wait_for_service(service_name, timeout = 10)
publisher = rospy.ServiceProxy(service_name, RigidTransformPublisher)
trans = self.translation
rot = self.quaternion
publisher(trans[0], trans[1], trans[2], rot[0], rot[1], rot[2], rot[3], self.from_frame, self.to_frame, mode)
def delete_from_ros(self, service_name='rigid_transforms/rigid_transform_publisher', namespace=None):
"""Removes RigidTransform referencing from_frame and to_frame from ROS publisher.
Note that this may not be this exact transform, but may that references the same frames (order doesn't matter)
Also, note that it may take quite a while for the transform to disappear from rigid_transform_publisher's cache
Requires ROS rigid_transform_publisher service to be running. Assuming autolab_core is installed as a catkin package,
this can be done with: roslaunch autolab_core rigid_transforms.launch
Parameters
----------
service_name : string, optional
RigidTransformPublisher service to interface with. If the RigidTransformPublisher services are started through
rigid_transforms.launch it will be called rigid_transform_publisher
namespace : string, optional
Namespace to prepend to transform_listener_service. If None, current namespace is prepended.
Raises
------
rospy.ServiceException
If service call to rigid_transform_publisher fails
"""
if namespace == None:
service_name = rospy.get_namespace() + service_name
else:
service_name = namespace + service_name
rospy.wait_for_service(service_name, timeout = 10)
publisher = rospy.ServiceProxy(service_name, RigidTransformPublisher)
publisher(0, 0, 0, 0, 0, 0, 0, self.from_frame, self.to_frame, 'delete')
def rigid_transform_from_ros(from_frame, to_frame, service_name='rigid_transforms/rigid_transform_listener', namespace=None):
"""Gets transform from ROS as a rigid transform
Requires ROS rigid_transform_publisher service to be running. Assuming autolab_core is installed as a catkin package,
this can be done with: roslaunch autolab_core rigid_transforms.launch
Parameters
----------
from_frame : :obj:`str`
to_frame : :obj:`str`
service_name : string, optional
RigidTransformListener service to interface with. If the RigidTransformListener services are started through
rigid_transforms.launch it will be called rigid_transform_listener
namespace : string, optional
Namespace to prepend to transform_listener_service. If None, current namespace is prepended.
Raises
------
rospy.ServiceException
If service call to rigid_transform_listener fails
"""
if namespace == None:
service_name = rospy.get_namespace() + service_name
else:
service_name = namespace + service_name
rospy.wait_for_service(service_name, timeout = 10)
listener = rospy.ServiceProxy(service_name, RigidTransformListener)
ret = listener(from_frame, to_frame)
quat = np.asarray([ret.w_rot, ret.x_rot, ret.y_rot, ret.z_rot])
trans = np.asarray([ret.x_trans, ret.y_trans, ret.z_trans])
rot = RigidTransform.rotation_from_quaternion(quat)
return RigidTransform(rotation=rot, translation=trans, from_frame=from_frame, to_frame=to_frame)
def fk_service_client(limb = "right"):
ns = "ExternalTools/" + limb + "/PositionKinematicsNode/FKService"
fksvc = rospy.ServiceProxy(ns, SolvePositionFK)
fkreq = SolvePositionFKRequest()
joints = JointState()
joints.name = ['right_j0', 'right_j1', 'right_j2', 'right_j3',
'right_j4', 'right_j5', 'right_j6']
joints.position = [0.763331, 0.415979, -1.728629, 1.482985,
-1.135621, -1.674347, -0.496337]
# Add desired pose for forward kinematics
fkreq.configuration.append(joints)
# Request forward kinematics from base to "right_hand" link
fkreq.tip_names.append('right_hand')
try:
rospy.wait_for_service(ns, 5.0)
resp = fksvc(fkreq)
except (rospy.ServiceException, rospy.ROSException), e:
rospy.logerr("Service call failed: %s" % (e,))
return False
# Check if result valid
if (resp.isValid[0]):
rospy.loginfo("SUCCESS - Valid Cartesian Solution Found")
rospy.loginfo("\nFK Cartesian Solution:\n")
rospy.loginfo("------------------")
rospy.loginfo("Response Message:\n%s", resp)
else:
rospy.logerr("INVALID JOINTS - No Cartesian Solution Found.")
return False
return True
simplemaze_turtlebot_camera_cvnn.py 文件源码
项目:ai-bs-summer17
作者: uchibe
项目源码
文件源码
阅读 16
收藏 0
点赞 0
评论 0
def __init__(self):
self.force_reset = True
self.vel_pub = rospy.Publisher('/mobile_base/commands/velocity', Twist, queue_size=5)
self.unpause = rospy.ServiceProxy('/gazebo/unpause_physics', Empty)
self.pause = rospy.ServiceProxy('/gazebo/pause_physics', Empty)
self.reset_proxy = rospy.ServiceProxy('/gazebo/reset_simulation', Empty)
self.img_rows = 32
self.img_cols = 32
self.img_channels = 1
def __init__(self, robot_id):
srv_str = "/robot%s/replace" % robot_id
rospy.wait_for_service(srv_str)
self.move = rospy.ServiceProxy(srv_str, MoveRobot)
# TODO: handle rospy.service.ServiceException which can thrown from this
self.starting_random_positions = None
self.straight_section_poses = None
# reset bot to a random position
def _create_service_client(self, srv_name):
"""
Create a service client proxy
:param srv_name: Name of the service
"""
if self._srv:
self._srv.close()
if srv_name in rosservice.get_service_list():
rospy.loginfo("Creating proxy for service '%s'" % srv_name)
self._srv = rospy.ServiceProxy(srv_name, rosservice.get_service_class_by_name(srv_name))
def _create_service_client(self, srv_name):
"""
Method that creates a client service proxy to call either the GetFaceProperties.srv or the Recognize.srv
:param srv_name:
"""
if self._srv:
self._srv.close()
if srv_name in rosservice.get_service_list():
rospy.loginfo("Creating proxy for service '%s'" % srv_name)
self._srv = rospy.ServiceProxy(srv_name, rosservice.get_service_class_by_name(srv_name))