def lookupFeet(name, tfBuffer):
global lf_start_position
global lf_start_orientation
global rf_start_position
global rf_start_orientation
lfp = "/ihmc_ros/{0}/left_foot_frame_name".format(name)
rfp = "/ihmc_ros/{0}/right_foot_frame_name".format(name)
if rospy.has_param(rfp) and rospy.has_param(lfp):
lfname = rospy.get_param(lfp)
rfname = rospy.get_param(rfp)
leftFootWorld = tfBuffer.lookup_transform('world', lfname, rospy.Time())
lf_start_orientation = leftFootWorld.transform.rotation
lf_start_position = leftFootWorld.transform.translation
rightFootWorld = tfBuffer.lookup_transform('world', rfname, rospy.Time())
rf_start_orientation = rightFootWorld.transform.rotation
rf_start_position = rightFootWorld.transform.translation
python类get_param()的实例源码
def lookupFeet(name, tfBuffer):
global lf_start_position
global lf_start_orientation
global rf_start_position
global rf_start_orientation
lfp = "/ihmc_ros/{0}/left_foot_frame_name".format(name)
rfp = "/ihmc_ros/{0}/right_foot_frame_name".format(name)
if rospy.has_param(rfp) and rospy.has_param(lfp):
lfname = rospy.get_param(lfp)
rfname = rospy.get_param(rfp)
leftFootWorld = tfBuffer.lookup_transform('world', lfname, rospy.Time())
lf_start_orientation = leftFootWorld.transform.rotation
lf_start_position = leftFootWorld.transform.translation
rightFootWorld = tfBuffer.lookup_transform('world', rfname, rospy.Time())
rf_start_orientation = rightFootWorld.transform.rotation
rf_start_position = rightFootWorld.transform.translation
joint_trajectory_file_playback.py 文件源码
项目:cu-perception-manipulation-stack
作者: correlllab
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def wait(self):
"""
Waits for and verifies trajectory execution result
"""
#create a timeout for our trajectory execution
#total time trajectory expected for trajectory execution plus a buffer
last_time = self._r_goal.trajectory.points[-1].time_from_start.to_sec()
time_buffer = rospy.get_param(self._param_ns + 'goal_time', 0.0) + 1.5
timeout = rospy.Duration(self._slow_move_offset +
last_time +
time_buffer)
l_finish = self._left_client.wait_for_result(timeout)
r_finish = self._right_client.wait_for_result(timeout)
l_result = (self._left_client.get_result().error_code == 0)
r_result = (self._right_client.get_result().error_code == 0)
#verify result
if all([l_finish, r_finish, l_result, r_result]):
return True
else:
msg = ("Trajectory action failed or did not finish before "
"timeout/interrupt.")
rospy.logwarn(msg)
return False
def __init__(self):
token = rospy.get_param('/telegram/token', None)
if token is None:
rospy.logerr("No token found in /telegram/token param server.")
exit(0)
else:
rospy.loginfo("Got telegram bot token from param server.")
# Set CvBridge
self.bridge = CvBridge()
# Create the EventHandler and pass it your bot's token.
updater = Updater(token)
# Get the dispatcher to register handlers
dp = updater.dispatcher
# on noncommand i.e message - echo the message on Telegram
dp.add_handler(MessageHandler(Filters.text, self.pub_received))
# log all errors
dp.add_error_handler(self.error)
# Start the Bot
updater.start_polling()
def __init__(self):
self.base_pub = rospy.Publisher("/base_controller/command", Twist,
queue_size=1)
token = rospy.get_param('/telegram/token', None)
# Create the Updater and pass it your bot's token.
updater = Updater(token)
# Add command and error handlers
updater.dispatcher.add_handler(CommandHandler('start', self.start))
updater.dispatcher.add_handler(CommandHandler('help', self.help))
updater.dispatcher.add_handler(MessageHandler(Filters.text, self.echo))
updater.dispatcher.add_error_handler(self.error)
# Start the Bot
updater.start_polling()
def __init__(self):
self.rospack = RosPack()
with open(join(self.rospack.get_path('apex_playground'), 'config', 'torso.json')) as f:
self.params = json.load(f)
self.publish_rate = rospy.Rate(self.params['publish_rate'])
self.demo = rospy.get_param('demo_mode')
# Protected resources
self.in_rest_pose = False
self.robot_lock = RLock()
# Used services
self.torso = TorsoServices(self.params['robot_name'])
# Proposed services
self.reset_srv_name = 'torso/reset'
self.reset_srv = None
def __init__(self):
self.rospack = RosPack()
with open(join(self.rospack.get_path('apex_playground'), 'config', 'general.json')) as f:
self.params = json.load(f)
with open(join(self.rospack.get_path('apex_playground'), 'config', 'torso.json')) as f:
self.torso_params = json.load(f)
self.outside_ros = rospy.get_param('/use_sim_time', False) # True if work manager <-> controller comm must use ZMQ
id = search(r"(\d+)", rospy.get_namespace())
self.worker_id = 0 if id is None else int(id.groups()[0]) # TODO string worker ID
self.work = WorkManager(self.worker_id, self.outside_ros)
self.torso = Torso()
self.ergo = Ergo()
self.learning = Learning()
self.perception = Perception()
self.recorder = Recorder()
self.demonstrate = "" # Skill (Target space for Produce) or empty string if not running assessment
# Served services
self.service_name_demonstrate = "controller/assess"
rospy.Service(self.service_name_demonstrate, Assess, self.cb_assess)
rospy.loginfo('Controller fully started!')
def __init__(self):
self.rospack = RosPack()
self.port = rospy.get_param('ui_port', 80 if os.getuid() == 0 else 5000)
self.web_app_root = join(self.rospack.get_path('apex_playground'), 'webapp', 'static')
self.app = Flask(__name__, static_url_path='', static_folder=self.web_app_root)
self.cors = CORS(self.app, resources={r'/api/*': {'origins': '*'}})
self.services = UserServices()
self.window_length = 500
self.display_point_interval = 1
self.app.route('/')(self.root)
self.app.route('/api/interests', methods=['GET'])(self.experiment_status)
self.app.route('/api/focus', methods=['POST'])(self.update_focus)
self.app.route('/api/time-travel', methods=['POST'])(self.time_travel)
self.app.route('/api/reset', methods=['POST'])(self.reset)
self.app.route('/api/assessment', methods=['POST'])(self.update_assessment)
def __init__(self):
self.marker_id = rospy.get_param("~marker_id", 12)
self.frame_id = rospy.get_param("~frame_id", "odom_meas")
self.camera_frame_id = rospy.get_param("~camera_frame_id", "overhead_cam_frame")
self.count = rospy.get_param("~count", 50)
# local vars:
self.calibrate_flag = False
self.calibrated = False
self.calibrate_count = 0
self.kb = kbhit.KBHit()
self.trans_arr = np.zeros((self.count, 3))
self.quat_arr = np.zeros((self.count, 4))
self.trans_ave = np.zeros(3)
self.quat_ave = np.zeros(3)
# now create subscribers, timers, and publishers
self.br = tf.TransformBroadcaster()
self.kb_timer = rospy.Timer(rospy.Duration(0.1), self.keycb)
rospy.on_shutdown(self.kb.set_normal_term)
self.alvar_sub = rospy.Subscriber("ar_pose_marker", AlvarMarkers, self.alvarcb)
return
def _init_params(self):
self.port = rospy.get_param('~port', self.default_port)
self.robot_type = rospy.get_param('~robot_type', 'create')
#self.baudrate = rospy.get_param('~baudrate', self.default_baudrate)
self.update_rate = rospy.get_param('~update_rate', self.default_update_rate)
self.drive_mode = rospy.get_param('~drive_mode', 'twist')
self.has_gyro = rospy.get_param('~has_gyro', True)
self.odom_angular_scale_correction = rospy.get_param('~odom_angular_scale_correction', 1.0)
self.odom_linear_scale_correction = rospy.get_param('~odom_linear_scale_correction', 1.0)
self.cmd_vel_timeout = rospy.Duration(rospy.get_param('~cmd_vel_timeout', 0.6))
self.stop_motors_on_bump = rospy.get_param('~stop_motors_on_bump', True)
self.min_abs_yaw_vel = rospy.get_param('~min_abs_yaw_vel', None)
self.max_abs_yaw_vel = rospy.get_param('~max_abs_yaw_vel', None)
self.publish_tf = rospy.get_param('~publish_tf', False)
self.odom_frame = rospy.get_param('~odom_frame', 'odom')
self.base_frame = rospy.get_param('~base_frame', 'base_footprint')
self.operate_mode = rospy.get_param('~operation_mode', 3)
rospy.loginfo("serial port: %s"%(self.port))
rospy.loginfo("update_rate: %s"%(self.update_rate))
rospy.loginfo("drive mode: %s"%(self.drive_mode))
rospy.loginfo("has gyro: %s"%(self.has_gyro))
def __init__(self):
self.frame_id = rospy.get_param("~frame_id", "map")
cov_x = rospy.get_param("~cov_x", 0.6)
cov_y = rospy.get_param("~cov_y", 0.6)
cov_z = rospy.get_param("~cov_z", 0.6)
self.cov = self.cov_matrix(cov_x, cov_y, cov_z)
self.ps_pub = rospy.Publisher(
POSE_TOPIC, PoseStamped, queue_size=1)
self.ps_cov_pub = rospy.Publisher(
POSE_COV_TOPIC, PoseWithCovarianceStamped, queue_size=1)
self.ps_pub_3d = rospy.Publisher(
POSE_TOPIC_3D, PoseStamped, queue_size=1)
self.ps_cov_pub_3d = rospy.Publisher(
POSE_COV_TOPIC_3D, PoseWithCovarianceStamped, queue_size=1)
self.last = None
self.listener = tf.TransformListener()
self.tag_range_topics = rospy.get_param("~tag_range_topics")
self.subs = list()
self.ranges = dict()
self.tag_pos = dict()
self.altitude = 0.0
self.last_3d = None
for topic in self.tag_range_topics:
self.subs.append(rospy.Subscriber(topic, Range, self.range_cb))
def __init__(self):
# setup
CozmoTeleop.settings = termios.tcgetattr(sys.stdin)
atexit.register(self.reset_terminal)
# vars
self.head_angle = STD_HEAD_ANGLE
self.lift_height = STD_LIFT_HEIGHT
# params
self.lin_vel = rospy.get_param('~lin_vel', 0.2)
self.ang_vel = rospy.get_param('~ang_vel', 1.5757)
# pubs
self._head_pub = rospy.Publisher('head_angle', Float64, queue_size=1)
self._lift_pub = rospy.Publisher('lift_height', Float64, queue_size=1)
self._cmd_vel_pub = rospy.Publisher('cmd_vel', Twist, queue_size=1)
def __init__(self,dis=0.0):
State.__init__(self, outcomes=['succeeded','preempted','aborted'],input_keys=['target'])
rospy.on_shutdown(self.shutdown)
self.test_distance = target.y-dis
self.rate = 200
self.r = rospy.Rate(self.rate)
self.speed = rospy.get_param('~speed',0.08)
self.tolerance = rospy.get_param('~tolerance', 0.01)
self.cmd_vel = rospy.Publisher('/mobile_base_controller/smooth_cmd_vel',Twist,queue_size=5)
self.base_frame = rospy.get_param('~base_frame', '/base_link')
self.odom_frame = rospy.get_param('~odom_frame', '/odom')
self.tf_listener = tf.TransformListener()
rospy.sleep(1)
self.tf_listener.waitForTransform(self.odom_frame,self.base_frame,rospy.Time(),rospy.Duration(60.0))
#define a bianliang
self.flag = rospy.get_param('~flag', True)
rospy.loginfo(self.test_distance)
#tf get position
#publish cmd_vel
def __init__(self,angle=0):
State.__init__(self, outcomes=['succeeded','aborted','preempted'],input_keys=["angle_in"])
self.speed = rospy.get_param('~speed', 0.10)
self.secretindigal = 1.0
self.tolerance = rospy.get_param('tolerance', math.radians(5))
self.start = True
self.cmd_vel = rospy.Publisher('mobile_base_controller/smooth_cmd_vel', Twist, queue_size=5)
self.base_frame = rospy.get_param('~base_frame', '/base_link')
self.odom_frame = rospy.get_param('~odom_frame', '/odom')
rospy.on_shutdown(self.shutdown)
self.rate = 30
self.start_test = True
self.r = rospy.Rate(self.rate)
self.tf_listener = tf.TransformListener()
rospy.sleep(0.5)
self.tf_listener.waitForTransform(self.odom_frame, self.base_frame, rospy.Time(), rospy.Duration(60))
def __init__(self, angle=0):
State.__init__(self, outcomes=['succeeded', 'aborted', 'preempted'], input_keys=["angle_in"])
self.speed = rospy.get_param('~speed', 0.03)
self.secretindigal = 1.0
self.tolerance = rospy.get_param('tolerance', math.radians(5))
self.start = True
self.cmd_vel = rospy.Publisher('mobile_base_controller/smooth_cmd_vel', Twist, queue_size=5)
self.base_frame = rospy.get_param('~base_frame', '/base_link')
self.odom_frame = rospy.get_param('~odom_frame', '/odom')
rospy.on_shutdown(self.shutdown)
self.rate = 30
self.start_test = True
self.r = rospy.Rate(self.rate)
self.angle=angle
self.tf_listener = tf.TransformListener()
rospy.sleep(0.5)
self.tf_listener.waitForTransform(self.odom_frame, self.base_frame, rospy.Time(), rospy.Duration(60))
def __init__(self, angle=0):
State.__init__(self, outcomes=['succeeded', 'aborted', 'preempted'])
self.speed = rospy.get_param('~speed', 0.03)
self.secretindigal = 1.0
self.tolerance = rospy.get_param('tolerance', math.radians(5))
self.start = True
self.cmd_vel = rospy.Publisher('mobile_base_controller/smooth_cmd_vel', Twist, queue_size=5)
self.base_frame = rospy.get_param('~base_frame', '/base_link')
self.odom_frame = rospy.get_param('~odom_frame', '/odom')
rospy.on_shutdown(self.shutdown)
self.rate = 30
self.start_test = True
self.r = rospy.Rate(self.rate)
self.angle=angle
self.tf_listener = tf.TransformListener()
rospy.sleep(0.5)
self.tf_listener.waitForTransform(self.odom_frame, self.base_frame, rospy.Time(), rospy.Duration(60))
def __init__(self,dis=0):
State.__init__(self, outcomes=['succeeded','preempted','aborted'])
rospy.on_shutdown(self.shutdown)
self.test_distance = dis
self.rate = 100
self.r = rospy.Rate(self.rate)
self.speed = rospy.get_param('~speed',0.08)
self.tolerance = rospy.get_param('~tolerance', 0.01)
self.cmd_vel = rospy.Publisher('/mobile_base_controller/smooth_cmd_vel',Twist,queue_size=5)
self.base_frame = rospy.get_param('~base_frame', '/base_link')
self.odom_frame = rospy.get_param('~odom_frame', '/odom')
self.tf_listener = tf.TransformListener()
rospy.sleep(1)
self.tf_listener.waitForTransform(self.odom_frame,self.base_frame,rospy.Time(),rospy.Duration(60.0))
self.flag = rospy.get_param('~flag', True)
#tf get position
self.position = Point()
self.position = self.get_position()
self.y_start = self.position.y
self.x_start = self.position.x
#publish cmd_vel
def __init__(self,dis=0.0):
State.__init__(self, outcomes=['succeeded','preempted','aborted'])
rospy.on_shutdown(self.shutdown)
self.test_distance = target.y-0.0
self.rate = 100
self.r = rospy.Rate(self.rate)
self.speed = rospy.get_param('~speed',0.08)
self.tolerance = rospy.get_param('~tolerance', 0.01)
self.cmd_vel = rospy.Publisher('/mobile_base_controller/smooth_cmd_vel',Twist,queue_size=5)
self.base_frame = rospy.get_param('~base_frame', '/base_link')
self.odom_frame = rospy.get_param('~odom_frame', '/odom')
self.tf_listener = tf.TransformListener()
rospy.sleep(1)
self.tf_listener.waitForTransform(self.odom_frame,self.base_frame,rospy.Time(),rospy.Duration(60.0))
#define a bianliang
self.flag = rospy.get_param('~flag', True)
rospy.loginfo(self.test_distance)
#tf get position
#publish cmd_vel
def __init__(self,angle=0):
State.__init__(self, outcomes=['succeeded','aborted','preempted'])
self.speed = rospy.get_param('~speed', 0.20)
self.secretindigal = 1.0
self.tolerance = rospy.get_param('tolerance', math.radians(5))
self.start = True
self.cmd_vel = rospy.Publisher('mobile_base_controller/smooth_cmd_vel', Twist, queue_size=5)
self.base_frame = rospy.get_param('~base_frame', '/base_link')
self.odom_frame = rospy.get_param('~odom_frame', '/odom')
rospy.on_shutdown(self.shutdown)
self.rate = 30
self.start_test = True
self.r = rospy.Rate(self.rate)
self.angle = angle
self.tf_listener = tf.TransformListener()
rospy.sleep(0.5)
self.tf_listener.waitForTransform(self.odom_frame, self.base_frame, rospy.Time(), rospy.Duration(60))
def __init__(self,angle=0):
State.__init__(self, outcomes=['succeeded','aborted','preempted'])
self.speed = rospy.get_param('~speed', 0.20)
self.secretindigal = 1.0
self.tolerance = rospy.get_param('tolerance', math.radians(5))
self.start = True
self.cmd_vel = rospy.Publisher('mobile_base_controller/smooth_cmd_vel', Twist, queue_size=5)
self.base_frame = rospy.get_param('~base_frame', '/base_link')
self.odom_frame = rospy.get_param('~odom_frame', '/odom')
rospy.on_shutdown(self.shutdown)
self.rate = 30
self.start_test = True
self.r = rospy.Rate(self.rate)
self.angle = angle
self.tf_listener = tf.TransformListener()
rospy.sleep(0.5)
self.tf_listener.waitForTransform(self.odom_frame, self.base_frame, rospy.Time(), rospy.Duration(60))