def __init__(self):
self.run_recognition = rospy.get_param('/rgb_object_detection/run_recognition')
self.model_filename = rospy.get_param('/rgb_object_detection/model_file')
self.weights_filename = rospy.get_param('/rgb_object_detection/weights_file')
self.categories_filename = rospy.get_param('/rgb_object_detection/category_file')
self.verbose = rospy.get_param('/rgb_object_detection/verbose', False)
self.bridge = CvBridge()
self.image_sub = rospy.Subscriber('/camera/rgb/image_color', Image, self.img_cb)
self.patches_sub = rospy.Subscriber('/candidate_regions_depth', PolygonStamped, self.patches_cb)
self.detection_pub = rospy.Publisher('/detections', Detection, queue_size=1)
# you can read this value off of your sensor from the '/camera/depth_registered/camera_info' topic
self.P = np.array([[525.0, 0.0, 319.5, 0.0], [0.0, 525.0, 239.5, 0.0], [0.0, 0.0, 1.0, 0.0]])
if self.run_recognition:
self.cnn = CNN('', self.model_filename, self.weights_filename, self.categories_filename, '', 0, 0, self.verbose)
self.cnn.load_model()
python类get_param()的实例源码
def __init__(self):
rospy.init_node('multiplexer_node', anonymous=False)
rospy.loginfo('\033[93m[%s]\033[0m wait for bringup social_events_memory...'%rospy.get_name())
rospy.wait_for_service('social_events_memory/read_data')
self.rd_event_mem = rospy.ServiceProxy('social_events_memory/read_data', ReadData)
rospy.Subscriber('forwarding_event', ForwardingEvent, self.handle_social_events)
self.pub_rasing_event = rospy.Publisher('raising_events', RaisingEvents, queue_size=10)
self.events_queue = Queue.Queue()
self.recognized_words_queue = Queue.Queue()
event_period = rospy.get_param('~event_period', 0.5)
rospy.Timer(rospy.Duration(event_period), self.handle_trigger_events)
rospy.loginfo('\033[93m[%s]\033[0m initialized...'%rospy.get_name())
rospy.spin()
def __init__(self):
# first let's load all parameters:
self.frame_id = rospy.get_param("~frame_id", "odom_meas")
self.camera_frame_id = rospy.get_param("~camera_frame_id", "overhead_cam_frame")
self.base_frame_id = rospy.get_param("~base_frame_id", "base_meas")
self.x0 = rospy.get_param("~x0", 0.0)
self.y0 = rospy.get_param("~y0", 0.0)
self.th0 = rospy.get_param("~th0", 0.0)
self.pubstate = rospy.get_param("~pubstate", True)
self.marker_id = rospy.get_param("~marker_id", 12)
# setup other required vars:
self.odom_offset = odom_conversions.numpy_to_odom(np.array([self.x0, self.y0, self.th0]),
self.frame_id)
self.path_list = deque([], maxlen=PATH_LEN)
# now let's create publishers, listeners, and subscribers
self.br = tf.TransformBroadcaster()
self.listener = tf.TransformListener()
self.meas_pub = rospy.Publisher("meas_pose", Odometry, queue_size=5)
self.path_pub = rospy.Publisher("meas_path", Path, queue_size=1)
self.alvar_sub = rospy.Subscriber("ar_pose_marker", AlvarMarkers, self.alvarcb)
self.publish_serv = rospy.Service("publish_bool", SetBool, self.pub_bool_srv_cb)
self.offset_serv = rospy.Service("set_offset", SetOdomOffset, self.offset_srv_cb)
return
def __init__(self):
"""
Create HeadLiftJoy object.
"""
# params
self._head_axes = rospy.get_param('~head_axes', 3)
self._lift_axes = rospy.get_param('~lift_axes', 3)
self._head_button = rospy.get_param('~head_button', 4)
self._lift_button = rospy.get_param('~lift_button', 5)
# pubs
self._head_pub = rospy.Publisher('head_angle', Float64, queue_size=1)
self._lift_pub = rospy.Publisher('lift_height', Float64, queue_size=1)
# subs
self._joy_sub = rospy.Subscriber('joy', Joy, self._joy_cb, queue_size=1)
def _read_unit_offsets(self):
if not rospy.has_param('~num_of_units'):
rospy.logwarn("No unit offset parameters found!")
num_of_units = rospy.get_param('~num_of_units', 0)
self._unit_offsets = np.zeros((num_of_units, 3))
self._unit_coefficients = np.zeros((num_of_units, 2))
for i in xrange(num_of_units):
unit_params = rospy.get_param('~unit_{}'.format(i))
x = unit_params['x']
y = unit_params['y']
z = unit_params['z']
self._unit_offsets[i, :] = [x, y, z]
p0 = unit_params['p0']
p1 = unit_params['p1']
self._unit_coefficients[i, :] = [p0, p1]
rospy.loginfo("Unit offsets: {}".format(self._unit_offsets))
rospy.loginfo("Unit coefficients: {}".format(self._unit_coefficients))
def get_topic_service_names(self):
topic_names = {}
service_names = {}
# Topics.
topic_names['msf_odometry'] = rospy.get_param('~msf_odometry_topic', 'msf_core/odometry')
# Services.
service_names['init_msf_height'] = rospy.get_param('~init_msf_height_srv',
'pose_sensor_rovio/pose_sensor/initialize_msf_height')
service_names['init_msf_scale'] = rospy.get_param('~init_msf_scale_srv',
'pose_sensor_rovio/pose_sensor/initialize_msf_scale')
# Check if we should add a leading namespace
name_space = rospy.get_param('~namespace', '')
for key, value in topic_names.iteritems():
topic_names[key] = helpers.get_full_namespace(name_space, value)
for key, value in service_names.iteritems():
service_names[key] = helpers.get_full_namespace(name_space, value)
return topic_names, service_names
def get_topic_names(self):
# RTK info topics
topic_names = {}
topic_names['piksi_receiver_state'] = rospy.get_param('~piksi_receiver_state_topic',
'piksi/debug/receiver_state')
topic_names['piksi_uart_state'] = rospy.get_param('~piksi_uart_state_topic',
'piksi/debug/uart_state')
topic_names['piksi_baseline_ned'] = rospy.get_param('~piksi_baseline_ned_topic',
'piksi/baseline_ned')
topic_names['piksi_wifi_corrections'] = rospy.get_param('~piksi_num_wifi_corrections_topic',
'piksi/debug/wifi_corrections')
topic_names['piksi_navsatfix_rtk_fix'] = rospy.get_param('~piksi_navsatfix_rtk_fix',
'piksi/navsatfix_rtk_fix')
topic_names['piksi_age_of_corrections'] = rospy.get_param('~piksi_age_of_corrections_topic',
'piksi/age_of_corrections')
# Check if we should add a leading namespace
name_space = rospy.get_param('~namespace', '')
for key, value in topic_names.iteritems():
topic_names[key] = helpers.get_full_namespace(name_space, value)
return topic_names
def __init__(self):
if rospy.has_param('~orientation_offset'):
# Orientation offset as quaterion q = [x,y,z,w].
self.orientation_offset = rospy.get_param('~orientation_offset')
else:
yaw_offset_deg = rospy.get_param('~yaw_offset_deg', 0.0)
self.orientation_offset = tf.quaternion_from_euler(0.0, 0.0, math.radians(yaw_offset_deg))
rospy.Subscriber(rospy.get_name() + "/imu_in", Imu, self.imu_callback)
self.pub_imu_out = rospy.Publisher(rospy.get_name() + '/imu_out',
Imu, queue_size=10)
rospy.spin()
def init(self):
pygame.init()
clock = pygame.time.Clock()
screen = pygame.display.set_mode((250, 250))
rospy.init_node('highway_teleop')
self.rate = rospy.Rate(rospy.get_param('~hz', 10))
self.acc = rospy.get_param('~acc', 5)
self.yaw = rospy.get_param('~yaw', 0)
self.robot_pub = rospy.Publisher('robot_0/cmd_vel', Twist, queue_size=1)
print "Usage: \n \
up arrow: accelerate \n \
down arrow: decelerate \n \
left arrow: turn left \n \
right arrow: turn right"
def init(self):
pygame.init()
clock = pygame.time.Clock()
screen = pygame.display.set_mode((250, 250))
rospy.init_node('teleop')
self.rate = rospy.Rate(rospy.get_param('~hz', 20))
self.acc = rospy.get_param('~acc', 1)
self.yaw = rospy.get_param('~yaw', 0.25)
self.robot_pub = rospy.Publisher('control_command', controlCommand, queue_size=1)
self.path_record_pub = rospy.Publisher('record_state', \
RecordState, queue_size=1)
self.highway_game_start_pub = rospy.Publisher('highway_game_start', RecordState, queue_size=1)
print "Usage: \n \
up arrow: accelerate \n \
down arrow: decelerate \n \
left arrow: turn left \n \
right arrow: turn right"
def from_parameters(self):
"""
Stores this calibration as ROS parameters in the namespace of the current node.
:rtype: None
"""
calib_dict = {}
root_params = ['eye_on_hand', 'tracking_base_frame']
for rp in root_params:
calib_dict[rp] = rospy.get_param(rp)
if calib_dict['eye_on_hand']:
calib_dict['robot_effector_frame'] = rospy.get_param('robot_effector_frame')
else:
calib_dict['robot_base_frame'] = rospy.get_param('robot_base_frame')
transf_params = 'x', 'y', 'z', 'qx', 'qy', 'qz', 'qw'
calib_dict['transformation'] = {}
for tp in transf_params:
calib_dict['transformation'][tp] = rospy.get_param('transformation/'+tp)
self.from_dict(calib_dict)
def __init__(self):
self.eye_on_hand = rospy.get_param('eye_on_hand', False)
# tf names
self.robot_base_frame = rospy.get_param('robot_base_frame', 'base_link')
self.robot_effector_frame = rospy.get_param('robot_effector_frame', 'tool0')
self.tracking_base_frame = rospy.get_param('tracking_base_frame', 'optical_origin')
self.tracking_marker_frame = rospy.get_param('tracking_marker_frame', 'optical_target')
rospy.wait_for_service(hec.GET_SAMPLE_LIST_TOPIC)
self.get_sample_proxy = rospy.ServiceProxy(hec.GET_SAMPLE_LIST_TOPIC, hec.srv.TakeSample)
rospy.wait_for_service(hec.TAKE_SAMPLE_TOPIC)
self.take_sample_proxy = rospy.ServiceProxy(hec.TAKE_SAMPLE_TOPIC, hec.srv.TakeSample)
rospy.wait_for_service(hec.REMOVE_SAMPLE_TOPIC)
self.remove_sample_proxy = rospy.ServiceProxy(hec.REMOVE_SAMPLE_TOPIC, hec.srv.RemoveSample)
rospy.wait_for_service(hec.COMPUTE_CALIBRATION_TOPIC)
self.compute_calibration_proxy = rospy.ServiceProxy(hec.COMPUTE_CALIBRATION_TOPIC, hec.srv.ComputeCalibration)
rospy.wait_for_service(hec.SAVE_CALIBRATION_TOPIC)
self.save_calibration_proxy = rospy.ServiceProxy(hec.SAVE_CALIBRATION_TOPIC, std_srvs.srv.Empty)
joint_trajectory_file_playback.py 文件源码
项目:intera_sdk
作者: RethinkRobotics
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def wait(self):
"""
Waits for and verifies trajectory execution result
"""
#create a timeout for our trajectory execution
#total time trajectory expected for trajectory execution plus a buffer
last_time = self.goal.trajectory.points[-1].time_from_start.to_sec()
time_buffer = rospy.get_param(self._param_ns + 'goal_time', 0.0) + 1.5
timeout = rospy.Duration(self._slow_move_offset +
last_time +
time_buffer)
finish = self._limb_client.wait_for_result(timeout)
result = (self._limb_client.get_result().error_code == 0)
#verify result
if all([finish, result]):
return True
else:
msg = ("Trajectory action failed or did not finish before "
"timeout/interrupt.")
rospy.logwarn(msg)
return False
def get_camera_details(self):
"""
Return the details of the cameras.
@rtype: list [str]
@return: ordered list of camera details
"""
camera_dict = dict()
camera_config_param = "/robot_config/camera_config"
try:
camera_dict = rospy.get_param(camera_config_param)
except KeyError:
rospy.logerr("RobotParam:get_camera_details cannot detect any "
"cameras under the parameter {0}".format(camera_config_param))
except (socket.error, socket.gaierror):
self._log_networking_error()
return camera_dict
def get_robot_assemblies(self):
"""
Return the names of the robot's assemblies from ROS parameter.
@rtype: list [str]
@return: ordered list of assembly names
(e.g. right, left, torso, head). on networked robot
"""
assemblies = list()
try:
assemblies = rospy.get_param("/robot_config/assembly_names")
except KeyError:
rospy.logerr("RobotParam:get_robot_assemblies cannot detect assembly names"
" under param /robot_config/assembly_names")
except (socket.error, socket.gaierror):
self._log_networking_error()
return assemblies
def get_joint_names(self, limb_name):
"""
Return the names of the joints for the specified
limb from ROS parameter.
@type limb_name: str
@param limb_name: name of the limb for which to retrieve joint names
@rtype: list [str]
@return: ordered list of joint names from proximal to distal
(i.e. shoulder to wrist). joint names for limb
"""
joint_names = list()
try:
joint_names = rospy.get_param(
"robot_config/{0}_config/joint_names".format(limb_name))
except KeyError:
rospy.logerr(("RobotParam:get_joint_names cannot detect joint_names for"
" arm \"{0}\"").format(limb_name))
except (socket.error, socket.gaierror):
self._log_networking_error()
return joint_names
def move_to_neutral(self, timeout=15.0, speed=0.3):
"""
Command the Limb joints to a predefined set of "neutral" joint angles.
From rosparam named_poses/<limb>/poses/neutral.
@type timeout: float
@param timeout: seconds to wait for move to finish [15]
@type speed: float
@param speed: ratio of maximum joint speed for execution
default= 0.3; range= [0.0-1.0]
"""
try:
neutral_pose = rospy.get_param("named_poses/{0}/poses/neutral".format(self.name))
except KeyError:
rospy.logerr(("Get neutral pose failed, arm: \"{0}\".").format(self.name))
return
angles = dict(zip(self.joint_names(), neutral_pose))
self.set_joint_position_speed(speed)
return self.move_to_joint_positions(angles, timeout)
def __init__(self):
# initialize a socket
self.host = rospy.get_param('~server_address', '127.0.0.1')
self.port = rospy.get_param('~server_port', 20011)
self.friend_color = rospy.get_param('/friend_color', 'yellow')
rospy.loginfo('server address is set to [' + self.host + ']')
rospy.loginfo('server port is set to [' + str(self.port) + ']')
rospy.loginfo('team color is set to [' + self.friend_color + ']')
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# make subscribers
self.subscribers = []
for i in xrange(12):
topic = "/robot_" + str(i) + "/robot_commands"
self.subscribers.append(
rospy.Subscriber(
topic,
robot_commands,
self.sendCommands,
callback_args=i))
def __init__(self):
rospy.init_node('fake_renderer', anonymous=True)
try:
topic_name = rospy.get_param('~action_name')
except KeyError as e:
print('[ERROR] Needed parameter for (topic_name)...')
quit()
if 'render_gesture' in rospy.get_name():
self.GetInstalledGesturesService = rospy.Service(
"get_installed_gestures",
GetInstalledGestures,
self.handle_get_installed_gestures
)
self.motion_list = {
'neutral': ['neutral_motion1'],
'encourge': ['encourge_motion1'],
'attention': ['attention_motion1'],
'consolation': ['consolation_motion1'],
'greeting': ['greeting_motion1'],
'waiting': ['waiting_motion1'],
'advice': ['advice_motion1'],
'praise': ['praise_motion1'],
'command': ['command_motion1'],
}
self.server = actionlib.SimpleActionServer(
topic_name, RenderItemAction, self.execute_callback, False)
self.server.start()
rospy.loginfo('[%s] initialized...' % rospy.get_name())
rospy.spin()
def _read_unit_offsets(self):
if not rospy.has_param('~num_of_units'):
rospy.logwarn("No unit offset parameters found!")
num_of_units = rospy.get_param('~num_of_units', 0)
self._unit_offsets = np.zeros((num_of_units, 3))
self._unit_coefficients = np.zeros((num_of_units, 2))
for i in xrange(num_of_units):
unit_params = rospy.get_param('~unit_{}'.format(i))
x = unit_params['x']
y = unit_params['y']
z = unit_params['z']
self._unit_offsets[i, :] = [x, y, z]
p0 = unit_params['p0']
p1 = unit_params['p1']
self._unit_coefficients[i, :] = [p0, p1]
rospy.loginfo("Unit offsets: {}".format(self._unit_offsets))
rospy.loginfo("Unit coefficients: {}".format(self._unit_coefficients))
def __init__(self, image_path=None):
height = int(rospy.get_param("~grid_height", 800))
width = int(rospy.get_param("~grid_width", 800))
resolution = rospy.get_param("~grid_resolution", .25)
ogrid_topic = rospy.get_param("/lqrrt_node/ogrid_topic", "/ogrid")
self.grid_drawer = DrawGrid(height, width, image_path)
self.ogrid_pub = rospy.Publisher(ogrid_topic, OccupancyGrid, queue_size=1)
m = MapMetaData()
m.resolution = resolution
m.width = width
m.height = height
pos = np.array([-width * resolution / 2, -height * resolution / 2, 0])
quat = np.array([0, 0, 0, 1])
m.origin = Pose()
m.origin.position.x, m.origin.position.y = pos[:2]
self.map_meta_data = m
rospy.Timer(rospy.Duration(1), self.pub_grid)
def save_recipe_dp(self, variable):
"""
Save the recipe start/end to the env. data pt. DB, so we can restart
the recipe if necessary.
"""
doc = EnvironmentalDataPoint({
"environment": self.environment,
"variable": variable,
"is_desired": True,
"value": rospy.get_param(params.CURRENT_RECIPE),
"timestamp": rospy.get_time()
})
doc_id = gen_doc_id(rospy.get_time())
self.env_data_db[doc_id] = doc
#------------------------------------------------------------------------------
# Our ROS node main entry point. Starts up the node and then waits forever.
def test_valid_variables(self):
ENVIRONMENTAL_VARIABLES = frozenset(
VariableInfo.from_dict(d)
for d in rospy.get_param("/var_types/environment_variables").itervalues())
RECIPE_VARIABLES = frozenset(
VariableInfo.from_dict(d)
for d in rospy.get_param("/var_types/recipe_variables").itervalues())
VALID_VARIABLES = ENVIRONMENTAL_VARIABLES.union(RECIPE_VARIABLES)
print(VALID_VARIABLES)
assert len(VALID_VARIABLES) == 19
# This builds a dictionary of publisher instances using a
# "dictionary comprehension" (syntactic sugar for building dictionaries).
# The constant has to be declared here because get_message_class
# needs to be called after the node is initialized.
PUBLISHERS = {
variable.name: rospy.Publisher(
"{}/desired".format(variable.name),
get_message_class(variable.type),
queue_size=10)
for variable in VALID_VARIABLES
}
def test_method_generate_digest(self):
key = 'hello_world'
entry = rospy.get_param(self.server.speech_key_to_param(key))
parsed = self.server.parse_template(entry['template'], entry['params'])
digest_a = self.server.generate_digest(key, parsed)
self.assertIsNotNone(digest_a)
self.assertIsInstance(digest_a, str)
parsed = self.server.parse_template(
entry['template'],
_.assign(entry['params'], {'name': 'Needy'}))
digest_b = self.server.generate_digest(key, parsed)
self.assertIsNotNone(digest_b, 'is none')
self.assertIsInstance(digest_b, str, 'is not a string')
self.assertNotEqual(digest_a, digest_b,
'equal digests with diff params')
def test_method_generate_cache_payload(self):
digest = self.server.generate_digest(
'hello_world',
'Hello, world! My name is a needy robot.')
expected = {
'key': 'hello_world',
'hash': digest,
'file': os.path.join(
self.server._cache_dir,
'{}.{}'.format(digest, self.server.voice.codec)),
'effects': ' '.join(self.server.effects),
'voice': {
'name': self.server.voice.voice_name,
'speech_rate': self.server.voice.speech_rate,
'codec': self.server.voice.codec
},
'template': rospy.get_param(
self.server.speech_key_to_param('hello_world'))['template'],
'params': {
'name': 'a needy robot'
}
}
payload = self.server.generate_cache_payload(self.action)
self.assertIsNotNone(payload, 'no payload created')
self.assertEqual(payload, expected, 'generated payload is incorrect')
def warm_cache(self):
"""
Collects all dialog in your manifest and stores them in the cache
directory as well as creating entries in the cache manifest.
"""
rospy.loginfo('Warming speech cache...')
m = rospy.get_param('/needybot/speech/dialog')
for k, v in m.iteritems():
goal = SpeechGoal(key=k)
gen = self.generate_cache_payload(goal)
payload = self.fetch_from_cache(k, gen.get('hash'))
if payload is None:
payload = gen
if self.validate_payload(payload):
self.add_to_cache(payload)
self.generate_raw_speech(payload)
else:
raise TypeError('Invalid payload send to speech server')
rospy.loginfo('Speech cache is ready.')
self.warmed_pub.publish()
def __init__(self):
"""Constructor for the class
initialize topic subscription and
instance variables
"""
self.r = rospy.Rate(5)
self.publisher = rospy.Publisher('/cmd_vel', Twist, queue_size=10)
rospy.Subscriber('/scan', LaserScan, self.process_scan)
# user chooses which parking mode to be performed
self.is_parallel = rospy.get_param('~parallel', False)
# Instance Variables
self.timestamp1 = None
self.timestamp2 = None
self.dist2Neato = None
self.dist2Wall = None
self.widthOfSpot = None
self.twist = None
self.radius = None
# Adjusment to be made before moving along the arc
self.adjustment = 0
self.isAligned = False
def __init__(self, name, blackboard):
"""
Tree task responsible for spawning new tasks. Once a task has a status of completed of failure
this branch of the tree is executed. The timer waits for 5 seconds before sending the final
complete signal to the running task and spawning a new one.
Args:
name(str): Name of this behavior tree task for identification.
blackboard(Blackboard): Behavior tree blackboard that stores global vars.
"""
super(Spawner, self).__init__(name, blackboard)
self.timer = None
self.spawning = False
self.spawn_delay = rospy.get_param("~spawn-delay", 0.0)
def __init__(self, frame, transform_module=None):
rospy.loginfo("Zarj eyes initialization begin")
self.bridge = CvBridge()
if transform_module is not None:
self.tf = transform_module
else:
self.tf = tfzarj.TfZarj(rospy.get_param('/ihmc_ros/robot_name'))
self.tf.init_transforms()
self.detection_requests = []
self.frame_id = frame
self.image_sub_left = None
self.image_sub_cloud = None
rospy.loginfo("Zarj eyes initialization finished")
def lookupFeet(name, tfBuffer):
global lf_start_position
global lf_start_orientation
global rf_start_position
global rf_start_orientation
lfp = "/ihmc_ros/{0}/left_foot_frame_name".format(name)
rfp = "/ihmc_ros/{0}/right_foot_frame_name".format(name)
if rospy.has_param(rfp) and rospy.has_param(lfp):
lfname = rospy.get_param(lfp)
rfname = rospy.get_param(rfp)
leftFootWorld = tfBuffer.lookup_transform('world', lfname, rospy.Time())
lf_start_orientation = leftFootWorld.transform.rotation
lf_start_position = leftFootWorld.transform.translation
rightFootWorld = tfBuffer.lookup_transform('world', rfname, rospy.Time())
rf_start_orientation = rightFootWorld.transform.rotation
rf_start_position = rightFootWorld.transform.translation