def __init__(self):
rospy.init_node('multiplexer_node', anonymous=False)
rospy.loginfo('\033[93m[%s]\033[0m wait for bringup social_events_memory...'%rospy.get_name())
rospy.wait_for_service('social_events_memory/read_data')
self.rd_event_mem = rospy.ServiceProxy('social_events_memory/read_data', ReadData)
rospy.Subscriber('forwarding_event', ForwardingEvent, self.handle_social_events)
self.pub_rasing_event = rospy.Publisher('raising_events', RaisingEvents, queue_size=10)
self.events_queue = Queue.Queue()
self.recognized_words_queue = Queue.Queue()
event_period = rospy.get_param('~event_period', 0.5)
rospy.Timer(rospy.Duration(event_period), self.handle_trigger_events)
rospy.loginfo('\033[93m[%s]\033[0m initialized...'%rospy.get_name())
rospy.spin()
python类Timer()的实例源码
def __init__(self):
rospy.init_node('gaze', anonymous=False)
self.lock = threading.RLock()
with self.lock:
self.current_state = GazeState.IDLE
self.last_state = self.current_state
# Initialize Variables
self.glance_timeout = 0
self.glance_timecount = 0
self.glance_played = False
self.idle_timeout = 0
self.idle_timecount = 0
self.idle_played = False
rospy.loginfo('\033[92m[%s]\033[0m waiting for bringup social_mind...'%rospy.get_name())
rospy.wait_for_service('environmental_memory/read_data')
rospy.wait_for_service('social_events_memory/read_data')
self.rd_memory = {}
self.rd_memory['environmental_memory'] = rospy.ServiceProxy('environmental_memory/read_data', ReadData)
self.rd_memory['social_events_memory'] = rospy.ServiceProxy('social_events_memory/read_data', ReadData)
rospy.Subscriber('raising_events', RaisingEvents, self.handle_raising_events)
rospy.Subscriber('gaze_focusing', String, self.handle_gaze_focusing)
self.pub_gaze_cmd = rospy.Publisher('gaze_command', GazeCommand, queue_size=10)
self.pub_viz_gaze_cmd = rospy.Publisher('visualization_gaze_cmd', PointStamped, queue_size=10)
rospy.Timer(rospy.Duration(GAZE_CONTROLLER_PERIOD), self.handle_gaze_controller)
rospy.loginfo('\033[92m[%s]\033[0m initialized...'%rospy.get_name())
rospy.spin()
def __init__(self, image_path=None):
height = int(rospy.get_param("~grid_height", 800))
width = int(rospy.get_param("~grid_width", 800))
resolution = rospy.get_param("~grid_resolution", .25)
ogrid_topic = rospy.get_param("/lqrrt_node/ogrid_topic", "/ogrid")
self.grid_drawer = DrawGrid(height, width, image_path)
self.ogrid_pub = rospy.Publisher(ogrid_topic, OccupancyGrid, queue_size=1)
m = MapMetaData()
m.resolution = resolution
m.width = width
m.height = height
pos = np.array([-width * resolution / 2, -height * resolution / 2, 0])
quat = np.array([0, 0, 0, 1])
m.origin = Pose()
m.origin.position.x, m.origin.position.y = pos[:2]
self.map_meta_data = m
rospy.Timer(rospy.Duration(1), self.pub_grid)
def update_step_timer(self, step):
"""
Helper method that shuts the current step timer down, then recreates
the step timer for the next step in the task.
If the step timeout is zero, do not create a new timeout; otherwise
create a new timer using the timeout value.
Args:
step (dict): the dictionary object representing the task step.
"""
if self.step_timer and self.step_timer.is_alive():
self.step_timer.shutdown()
if not self.active:
return
if step and step.timeout > 0:
self.step_timer = rospy.Timer(
rospy.Duration(step.timeout),
self.step_to_handler,
oneshot=True
)
def run(self):
"""
Pi Trees run callback that gets called when a previous task is completed.
Responsible for spawning up next task and passing data
b/t previous task and new one.
"""
# sleep to make sure completion audio and expression don't get
# overrun by the next task
if not self.spawning:
self.spawning = True
if self.spawn_delay > 0:
self.timer = rospy.Timer(rospy.Duration(self.spawn_delay), self.timer_handler, oneshot=True)
else:
self.timer_handler()
return pt.TaskStatus.RUNNING
def __init__(self):
self.marker_id = rospy.get_param("~marker_id", 12)
self.frame_id = rospy.get_param("~frame_id", "odom_meas")
self.camera_frame_id = rospy.get_param("~camera_frame_id", "overhead_cam_frame")
self.count = rospy.get_param("~count", 50)
# local vars:
self.calibrate_flag = False
self.calibrated = False
self.calibrate_count = 0
self.kb = kbhit.KBHit()
self.trans_arr = np.zeros((self.count, 3))
self.quat_arr = np.zeros((self.count, 4))
self.trans_ave = np.zeros(3)
self.quat_ave = np.zeros(3)
# now create subscribers, timers, and publishers
self.br = tf.TransformBroadcaster()
self.kb_timer = rospy.Timer(rospy.Duration(0.1), self.keycb)
rospy.on_shutdown(self.kb.set_normal_term)
self.alvar_sub = rospy.Subscriber("ar_pose_marker", AlvarMarkers, self.alvarcb)
return
def __init__(self, robotName, isConfirmation, suspect, baseBoid):
self.posThreshold = np.array([1,1])
self.timeThreshold = 2
self.lastCheckIn = time.time()
self.lastMsg = None
self.boid = copy.deepcopy(baseBoid)
self.robotName = robotName
self.suspect = suspect
self.isConfirm = isConfirmation
self.suspicionPub = rospy.Publisher('/swarmflock/suspicion', SuspicionMsg, queue_size=10)
self.boidSub = rospy.Subscriber('/' + self.suspect + '/swarmflock/boids', BoidMsg, self.handle_msg)
self.client = WiFiTrilatClient()
# Suspect variables
self.suspectMAC = self.client.hostToIP(self.client.IPtoMAC(self.suspect))
self.suspectVel = np.array([0,0])
self.suspectPos = np.array([0,0])
self.suspicious = False
self.runTimer = rospy.Timer(rospy.Duration(1), self.run)
def __init__(self):
# Holds the current drone status
self.status = -1
# Subscribe to the /ardrone/navdata topic, of message type navdata, and call self.ReceiveNavdata when a message is received
self.subNavdata = rospy.Subscriber('/ardrone/navdata',Navdata,self.ReceiveNavdata)
# Allow the controller to publish to the /ardrone/takeoff, land and reset topics
self.pubLand = rospy.Publisher('/ardrone/land',Empty)
self.pubTakeoff = rospy.Publisher('/ardrone/takeoff',Empty)
self.pubReset = rospy.Publisher('/ardrone/reset',Empty)
# Allow the controller to publish to the /cmd_vel topic and thus control the drone
self.pubCommand = rospy.Publisher('/cmd_vel',Twist)
# Setup regular publishing of control packets
self.command = Twist()
self.commandTimer = rospy.Timer(rospy.Duration(COMMAND_PERIOD/1000.0),self.SendCommand)
# Land the drone if we are shutting down
rospy.on_shutdown(self.SendLand)
def __init__(self,
base_set_command='amixer -c 0 sset Master playback',
base_get_command='amixer -c 0 sget Master playback'):
self.base_set_cmd = base_set_command
self.base_get_cmd = base_get_command
self.curr_vol = 0
rospy.loginfo("VolumeManager using base set command: '" +
self.base_set_cmd + "'")
rospy.loginfo("VolumeManager using base get command: '" +
self.base_get_cmd + "'")
self.sub = rospy.Subscriber('~set_volume',
Int8,
self.cb,
queue_size=1)
# Get volume to start publishing
self.curr_vol = self.get_current_volume()
self.pub = rospy.Publisher('~get_volume',
Int8,
latch=True,
queue_size=1)
self.timer = rospy.Timer(rospy.Duration(1.0), self.curr_vol_cb)
def __init__(self, topic_prefix, config_file, robot_config_file, write_lock, bag_file):
self.topic_prefix = topic_prefix
self.test_config = config_file
self.robot_config_file = robot_config_file
resources_timer_frequency = 100.0 # Hz
self.timer_interval = 1/resources_timer_frequency
self.res_pipeline = {}
self.BfW = BagfileWriter(bag_file, write_lock)
rospy.loginfo("Waiting for obstacle_distance node...")
rospy.wait_for_service(self.robot_config_file["obstacle_distance"]["services"])
self.obstacle_distance_server = rospy.ServiceProxy(self.robot_config_file["obstacle_distance"]["services"],
GetObstacleDistance)
rospy.Timer(rospy.Duration.from_sec(self.timer_interval), self.collect_obstacle_distances)
def __init__(self, root_frame, measured_frame, groundtruth, groundtruth_epsilon):
"""
Class for calculating the distance covered by the given frame in relation to a given root frame.
The tf data is sent over the tf topic given in the robot_config.yaml.
:param root_frame: name of the first frame
:type root_frame: string
:param measured_frame: name of the second frame. The distance will be measured in relation to the root_frame.
:type measured_frame: string
"""
self.active = False
self.root_frame = root_frame
self.measured_frame = measured_frame
self.path_length = 0.0
self.tf_sampling_freq = 20.0 # Hz
self.first_value = True
self.trans_old = []
self.rot_old = []
self.groundtruth = groundtruth
self.groundtruth_epsilon = groundtruth_epsilon
self.finished = False
self.listener = tf.TransformListener()
rospy.Timer(rospy.Duration.from_sec(1 / self.tf_sampling_freq), self.record_tf)
def __init__(self):
self._angle = 0;
self._step = STEP * -1
self._x = LINE_START_X
self._y = LINE_START_Y
self._x = RECT_START_X
self._y = RECT_START_Y
self._step_x = STEP
self._step_y = STEP
self._q_side = 0
self._cf = [Swarmfly(sc=self, cfid=0, color=ColorRGBA(1, 0, 1, 1), hoverpoint=Point(SPACE_OFFSET_X+1.2, SPACE_OFFSET_Y+1.2, SPACE_OFFSET_Z+1), takeoffpoint=Point(SPACE_OFFSET_X+1.25, SPACE_OFFSET_Y+0.15, SPACE_OFFSET_Z+0.5), landingpoint=Point(SPACE_OFFSET_X+0.25, SPACE_OFFSET_Y+0.15, SPACE_OFFSET_Z+0.5)),
Swarmfly(sc=self, cfid=1, color=ColorRGBA(1, 1, 0, 1), hoverpoint=Point(SPACE_OFFSET_X+0.7, SPACE_OFFSET_Y+0.7, SPACE_OFFSET_Z+1), takeoffpoint=Point(SPACE_OFFSET_X+1.75, SPACE_OFFSET_Y+0.15, SPACE_OFFSET_Z+0.5), landingpoint=Point(SPACE_OFFSET_X+0.75, SPACE_OFFSET_Y+0.15, SPACE_OFFSET_Z+0.5))]
for cf in self._cf:
cf.hoverpoint = self._calc_circle_position(cf.id)
self._rand_p1 = None
self._rand_p2 = None
self.timer = rospy.Timer(rospy.Duration(1.0/UPDATE_RATE),
self._run)
def latch_timer(self, step_name, tmr_id, duration, cb, oneshot=False):
step_dict = self.get_step_dict(step_name)
def handle_evt(evt):
if self.step == step_name:
cb(evt)
timers = step_dict.get('timers', {})
# Shutdown old timer with same ID if it exists
timer = timers.get(tmr_id, None)
if timer and timer.is_alive(): timer.shutdown()
timer = rospy.Timer(duration, handle_evt, oneshot=oneshot)
timers[tmr_id] = timer
def odom_cb(self, msg):
self.pose = msg.pose.pose
if not self.tmr:
self.tmr = rospy.Timer(
rospy.Duration(0.1),
self.check_activity
)
def resume(self):
if not self.paused:
return
self.paused = False
if self.current_step.timeout > 0:
self.step_timer = rospy.Timer(
rospy.Duration(self.current_step.timeout - (self.paused_time - self.step_load_time)),
self.step_to_handler,
oneshot=True
)
self.paused_time = None
def sync(self, rospy_timer_event):
"""Handles calls from rospy.Timer to sync the local objects and images
to the interop server.
Args:
rospy_timer_event (rospy.TimerEvent): Unused formal parameter
necessary for making this function work as a callback for
rospy.Timer.
"""
self.objects_dir.sync()
def __init__(self, listenInt, interface, freq, essid, psswd, ip, nm, discoverOnce=True):
self.robotName = os.getenv('HOSTNAME')
self.tolerance = 20
self.x = 0
self.y = 0
self.client = WiFiTrilatClient()
self.discoverOnce = discoverOnce
rospy.init_node(self.robotName + "_wifitrilat_server")
#self.rssiPub = rospy.Publisher('/' + self.robotName + '/WiFiRSSI', WiFiRSSIMsg, queue_size=10)
self.service = rospy.Service("/" + self.robotName + "/WiFiTrilat", WiFiTrilat, self.handle_Trilat)
self.listenInt = listenInt
self.interface = interface
#self.mac = mac.lower()
self.freq = int(freq)
self.msgs = []
cli.execute_shell("ifconfig %s down" % self.listenInt)
#self.wifi = Wireless(self.interface).setFrequency("%.3f" % (float(self.freq) / 1000))
self.connectToNet(essid, psswd,ip, nm)
cli.execute_shell("ifconfig %s up" % self.listenInt)
self.purge = rospy.Timer(rospy.Duration(2), self.msgPurge)
self.heartbeat = rospy.Timer(rospy.Duration(1), self.heartbeat_call)
self.discoverTimer = rospy.Timer(rospy.Duration(20), self.findSelfPos)
sniff(iface=self.listenInt, prn=self.handler, store=0)
rospy.spin()
def __init__(self, robotName, baseBoid, suspect=""):
self.robotName = robotName
self.suspect = ""
self.confirmFor = ""
self.timer = rospy.Timer(rospy.Duration(15), self.reset_suspect)
self.suspSub = rospy.Subscriber('/swarmflock/suspicion', SuspicionMsg, self.handle_suspicion)
self.manualSuspect = suspect
def __init__(self):
self.define()
rospy.Subscriber('speak_string', String, self.SpeedCB, queue_size=1)
rospy.Timer(rospy.Duration(self.ResponseSensitivity), self.TimerCB)
rospy.spin()
def main(args):
rospy.init_node('enhanced_rostopic_statistic')
if not args.window:
args.window = 10
ts = TopicStatistic(rospy.Duration(args.window), dst_topic=args.TOPIC, ref_topic=args.ref)
rospy.Timer(rospy.Duration(1.0), lambda x: ts.process())
rospy.spin()
def __init__(self, topic_prefix, config_file, robot_config_file, write_lock, bag_file):
self.topic_prefix = topic_prefix
self.test_config = config_file
self.resources_timer_frequency = 4.0 # Hz
self.timer_interval = 1/self.resources_timer_frequency
self.testblock_list = self.create_testblock_list()
self.pid_list = self.create_pid_list()
self.requested_nodes = {}
self.res_pipeline = {}
self.BfW = BagfileWriter(bag_file, write_lock)
rospy.Timer(rospy.Duration.from_sec(self.timer_interval), self.collect_resource_data)
def __init__(self):
#Loads the robot model, which contains the robot's kinematics information
self.robot = URDF.from_parameter_server()
#Subscribes to information about what the current joint values are.
rospy.Subscriber("/joint_states", JointState, self.joint_callback)
#Subscribes to command for end-effector pose
rospy.Subscriber("/cartesian_command", Transform, self.command_callback)
#Subscribes to command for redundant dof
rospy.Subscriber("/redundancy_command", Float32, self.redundancy_callback)
# Publishes desired joint velocities
self.pub_vel = rospy.Publisher("/joint_velocities", JointState, queue_size=1)
#This is where we hold the most recent joint transforms
self.joint_transforms = []
self.q_current = []
self.x_current = tf.transformations.identity_matrix()
self.R_base = tf.transformations.identity_matrix()
self.x_target = tf.transformations.identity_matrix()
self.q0_desired = 0
self.last_command_time = 0
self.last_red_command_time = 0
# Initialize timer that will trigger callbacks
self.mutex = Lock()
self.timer = rospy.Timer(rospy.Duration(0.1), self.timer_callback)
def __init__(self):
#Loads the robot model, which contains the robot's kinematics information
self.robot = URDF.from_parameter_server()
# Publishes Cartesian goals
self.pub_command = rospy.Publisher("/cartesian_command", geometry_msgs.msg.Transform,
queue_size=1)
# Publishes Redundancy goals
self.pub_red = rospy.Publisher("/redundancy_command", Float32, queue_size=1)
# Publisher to set robot position
self.pub_reset = rospy.Publisher("/joint_command", JointState, queue_size=1)
#This is where we hold the most recent joint transforms
self.joint_transforms = []
self.x_current = tf.transformations.identity_matrix()
self.R_base = tf.transformations.identity_matrix()
#Create "Interactive Marker" that we can manipulate in RViz
self.init_marker()
self.ee_tracking = 0
self.red_tracking = 0
self.q_current = []
self.x_target = tf.transformations.identity_matrix()
self.q0_desired = 0
self.mutex = Lock()
self.timer = rospy.Timer(rospy.Duration(0.3), self.timer_callback)
#Subscribes to information about what the current joint values are.
rospy.Subscriber("joint_states", JointState, self.joint_callback)
#Resets the robot to a known pose
def __init__(self, get_model_callback, model_callback):
rospy.init_node('tlight_model')
self.model = get_model_callback()
self.get_model = get_model_callback
self.predict = model_callback
self.bridge = CvBridge()
self.boxes = None
self.img = None
self.img_out = None
self.image_lock = threading.RLock()
#self.sub = rospy.Subscriber('/left_camera/image_color/compressed', CompressedImage, self.updateImage) # Use for CompressedImage topic
self.sub = rospy.Subscriber('/image_raw', Image, self.updateImage, queue_size=1)
self.pub = rospy.Publisher('/out_image', Image, queue_size=1)
rospy.Timer(rospy.Duration(0.04), self.callbackImage)
def __init__(self):
self.rate = rospy.get_param("~rate", 20.0)
self.period = 1.0 / self.rate
# angular mode maps angular z directly to steering angle
# (adjusted appropriately)
# non-angular mode is somewhat suspect, but it turns
# a linear y into a command to turn just so that the
# achieved linear x and y match the desired, though
# the vehicle has to turn to do so.
# Non-angular mode is not yet supported.
self.angular_mode = rospy.get_param("~angular_mode", True)
# Not used yet
# self.tf_buffer = tf2_ros.Buffer()
# self.tf = tf2_ros.TransformListener(self.tf_buffer)
# broadcast odometry
self.br = tf2_ros.TransformBroadcaster()
self.ts = TransformStamped()
self.ts.header.frame_id = "map"
self.ts.child_frame_id = "base_link"
self.ts.transform.rotation.w = 1.0
self.angle = 0
# The cmd_vel is assumed to be in the base_link frame centered
# on the middle of the two driven wheels
# This is half the distance between the two drive wheels
self.base_radius = rospy.get_param("~base_radius", 0.06)
self.wheel_radius = rospy.get_param("~wheel_radius", 0.04)
self.left_wheel_joint = rospy.get_param("~left_wheel_joint", "wheel_front_left_joint")
self.right_wheel_joint = rospy.get_param("~right_wheel_joint", "wheel_front_right_joint")
self.point_pub = rospy.Publisher("cmd_vel_spin_center", PointStamped, queue_size=1)
self.joint_pub = {}
self.joint_pub['left'] = rospy.Publisher("front_left/joint_state",
JointState, queue_size=1)
self.joint_pub['right'] = rospy.Publisher("front_right/joint_state",
JointState, queue_size=1)
# TODO(lucasw) is there a way to get TwistStamped out of standard
# move_base publishers?
# TODO(lucasw) make this more generic, read in a list of any number of wheels
# the requirement is that that all have to be aligned, and also need
# a set spin center.
self.ind = {}
self.joint_state = {}
self.joint_state['left'] = JointState()
self.joint_state['left'].name.append(self.left_wheel_joint)
self.joint_state['left'].position.append(0.0)
self.joint_state['left'].velocity.append(0.0)
self.joint_state['right'] = JointState()
self.joint_state['right'].name.append(self.right_wheel_joint)
self.joint_state['right'].position.append(0.0)
self.joint_state['right'].velocity.append(0.0)
self.cmd_vel = Twist()
rospy.Subscriber("cmd_vel", Twist, self.cmd_vel_callback, queue_size=2)
self.timer = rospy.Timer(rospy.Duration(self.period), self.update)
def detect_and_visualize(self, root_dir=None, extension=None,
classes=[], thresh=0.6, show_timer=False):
global imgi
global img_rect
global Frame
global show
global trackpos
global imshow
global acc_pub
global acc_enable
acc_pub = rospy.Publisher("acc_cmd", ACCCommand, queue_size=2)
acc_enable = rospy.Publisher("acc_enable", Bool, queue_size=2)
# rospy.Timer(rospy.Duration(0.02), self.trackCallback)
rospy.Subscriber("/zed/left/image_rect_color/compressed",CompressedImage, self.ImgCcallback, queue_size = 4)
# rospy.Subscriber("/zed/left/image_rect_color",Image, self.Imgcallback, queue_size = 4)
rospy.Subscriber("/zed/depth/depth_registered",Image, self.DepthImgcallback, queue_size = 4)
im_path = '/home/wolfram/mxnet/example/ssd/data/demo/dog.jpg'
with open(im_path, 'rb') as fp:
img_content = fp.read()
trackpos = 0
imshow = 0
imgi = mx.img.imdecode(img_content)
while(1):
# ret,img_rect = cap.read()
dets = self.im_detect(root_dir, extension, show_timer=show_timer)
# if not isinstance(im_list, list):
# im_list = [im_list]
# assert len(dets) == len(im_list)
# for k, det in enumerate(dets):
# img[:, :, (0, 1, 2)] = img[:, :, (2, 1, 0)]
# img = img_rect.copy()
self.visualize_detection(img_rect, dets[0], classes, thresh)
if imshow == 1:
cv2.imshow("tester", show)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
def __init__(self, odom_topic, ref_topic, move_topic, path_topic, tree_topic,
goal_topic, focus_topic, effort_topic, ogrid_topic, ogrid_threshold):
"""
Initialize with topic names and ogrid threshold as applicable.
Defaults are generated at the ROS params level.
"""
# One-time initializations
self.revisit_period = 0.05 # s
self.ogrid = None
self.ogrid_threshold = float(ogrid_threshold)
self.state = None
self.tracking = None
self.done = True
# Lil helpers
self.rostime = lambda: rospy.Time.now().to_sec()
self.intup = lambda arr: tuple(np.array(arr, dtype=np.int64))
self.get_hood = lambda img, row, col: img[row-1:row+2, col-1:col+2]
# Set-up planners
self.behaviors_list = [car, boat, escape]
for behavior in self.behaviors_list:
behavior.planner.set_system(erf=self.erf)
behavior.planner.set_runtime(sys_time=self.rostime)
behavior.planner.constraints.set_feasibility_function(self.is_feasible)
# Initialize resetable stuff
self.reset()
# Subscribers
rospy.Subscriber(odom_topic, Odometry, self.odom_cb)
rospy.Subscriber(ogrid_topic, OccupancyGrid, self.ogrid_cb)
rospy.sleep(0.5)
# Publishers
self.ref_pub = rospy.Publisher(ref_topic, Odometry, queue_size=1)
self.path_pub = rospy.Publisher(path_topic, PoseArray, queue_size=3)
self.tree_pub = rospy.Publisher(tree_topic, PoseArray, queue_size=3)
self.goal_pub = rospy.Publisher(goal_topic, PoseStamped, queue_size=3)
self.focus_pub = rospy.Publisher(focus_topic, PointStamped, queue_size=3)
self.eff_pub = rospy.Publisher(effort_topic, WrenchStamped, queue_size=3)
self.sampspace_pub = rospy.Publisher(sampspace_topic, PolygonStamped, queue_size=3)
self.guide_pub = rospy.Publisher(guide_topic, PointStamped, queue_size=3)
# Actions
self.move_server = actionlib.SimpleActionServer(move_topic, MoveAction, execute_cb=self.move_cb, auto_start=False)
self.move_server.start()
rospy.sleep(1)
# Timers
rospy.Timer(rospy.Duration(self.revisit_period), self.publish_ref)
rospy.Timer(rospy.Duration(self.revisit_period), self.action_check)
def __init__(self):
self.rate = rospy.get_param("~rate", 20.0)
self.period = 1.0 / self.rate
# angular mode maps angular z directly to steering angle
# (adjusted appropriately)
# non-angular mode is somewhat suspect, but it turns
# a linear y into a command to turn just so that the
# achieved linear x and y match the desired, though
# the vehicle has to turn to do so.
self.angular_mode = rospy.get_param("~angular_mode", True)
self.tf_buffer = tf2_ros.Buffer()
self.tf = tf2_ros.TransformListener(self.tf_buffer)
self.steer_link = rospy.get_param("~steer_link", "lead_steer")
self.steer_joint = rospy.get_param("~steer_joint", "lead_steer_joint")
# +/- this angle
self.min_steer_angle = rospy.get_param("~min_steer_angle", -0.7)
self.max_steer_angle = rospy.get_param("~max_steer_angle", 0.7)
self.wheel_joint = rospy.get_param("~wheel_joint", "wheel_lead_axle")
self.wheel_radius = rospy.get_param("~wheel_radius", 0.15)
# the spin center is always on the fixed axle y axis of the fixed axle,
# it is assume zero rotation on the steer_joint puts the steering
# at zero rotation with respect to fixed axle x axis (or xz plane)
self.fixed_axle_link = rospy.get_param("~fixed_axle_link", "back_axle")
self.point_pub = rospy.Publisher("cmd_vel_spin_center", PointStamped, queue_size=1)
self.steer_pub = rospy.Publisher("steer_joint_states", JointState, queue_size=1)
# TODO(lucasw) is there a way to get TwistStamped out of standard
# move_base publishers?
self.joint_state = JointState()
self.joint_state.name.append(self.steer_joint)
self.joint_state.position.append(0.0)
self.joint_state.velocity.append(0.0)
self.joint_state.name.append(self.wheel_joint)
self.joint_state.position.append(0.0)
self.joint_state.velocity.append(0.0)
self.cmd_vel = Twist()
rospy.Subscriber("cmd_vel", Twist, self.cmd_vel_callback, queue_size=2)
self.timer = rospy.Timer(rospy.Duration(self.period), self.update)