def publish_info(self, imu):
self.imu_msg = Imu()
self.imu_msg.linear_acceleration.x = imu.linear_acceleration_x
self.imu_msg.linear_acceleration.y = imu.linear_acceleration_y
self.imu_msg.linear_acceleration.z = imu.linear_acceleration_z
self.imu_msg.angular_velocity.x = imu.angular_velocity_x
self.imu_msg.angular_velocity.y = imu.angular_velocity_y
self.imu_msg.angular_velocity.z = imu.angular_velocity_z
self.imu_msg.orientation.x = imu.orientation_x
self.imu_msg.orientation.y = imu.orientation_y
self.imu_msg.orientation.z = imu.orientation_z
self.imu_msg.orientation.w = imu.orientation_w
self.imu_msg.header.stamp = rospy.Time.now()
self.imu_msg.header.frame_id = self.frame_name
self.imu_msg.header.seq = self.seq
self.pub_imu.publish(self.imu_msg)
self.seq += 1
python类Imu()的实例源码
def __init__(self):
if rospy.has_param('~orientation_offset'):
# Orientation offset as quaterion q = [x,y,z,w].
self.orientation_offset = rospy.get_param('~orientation_offset')
else:
yaw_offset_deg = rospy.get_param('~yaw_offset_deg', 0.0)
self.orientation_offset = tf.quaternion_from_euler(0.0, 0.0, math.radians(yaw_offset_deg))
rospy.Subscriber(rospy.get_name() + "/imu_in", Imu, self.imu_callback)
self.pub_imu_out = rospy.Publisher(rospy.get_name() + '/imu_out',
Imu, queue_size=10)
rospy.spin()
def __init__(self):
# Read Settings
self.read_settings()
# Init other variables
self._num_magnetometer_reads = 0
self._latest_bearings = np.zeros(shape = (self._number_samples_average, 1))
self._received_enough_samples = False
# Subscribe to magnetometer topic
rospy.Subscriber("magnetic_field", Vector3Stamped, self.magnetic_field_callback)
# Publishers
self._pub_bearing_raw = rospy.Publisher(rospy.get_name() + '/bearing_raw_deg',
Float64, queue_size = 10)
self._pub_bearing_avg = rospy.Publisher(rospy.get_name() + '/bearing_avg_deg',
Float64, queue_size = 10)
self._pub_imu_bearing_avg = rospy.Publisher(rospy.get_name() + '/imu_bearing_avg',
Imu, queue_size = 10)
if self._verbose:
self._pub_mag_corrected = rospy.Publisher(rospy.get_name() + '/mag_corrected',
Vector3Stamped, queue_size = 10)
rospy.spin()
def __init__(self):
self.petrone = Petrone()
# subscriber
self.sub_flight = rospy.Subscriber('cmd_fly', Int8, self.cb_fly, queue_size=1)
self.sub_cmd = rospy.Subscriber('cmd_vel_raw', Twist, self.cb_cmd, queue_size=1)
self.sub_color = rospy.Subscriber('led_color', String, self.cb_color, queue_size=1)
# publisher
self.pub_battery = rospy.Publisher('battery', Float32, queue_size=1)
self.pub_status_flight = rospy.Publisher('status/flight', Int8, queue_size=1)
self.pub_status_system = rospy.Publisher('status/system', Int8, queue_size=1)
self.pub_imu = rospy.Publisher('imu', Imu, queue_size=1)
# cache
self.is_disconnected = True
self.last_values = defaultdict(lambda: 0)
# flight parameters
self.twist = Twist()
self.twist_at = 0
def __init__(self):
self.rate = rospy.get_param('~rate', 100.0)
self.imu_name = rospy.get_param('~imu_name', 'imu_steer')
self.topic_name = rospy.get_param('~topic_name', 'topic_name')
self.roll = 0.0
self.pitch = 0.0
self.yaw = 0.0
self.pub_imu_roll_msg = Float32()
self.pub_imu_pitch_msg = Float32()
self.pub_imu_yaw_msg = Float32()
self.pub_imu_roll = rospy.Publisher('/' + self.imu_name +'/roll', Float32, queue_size=1)
self.pub_imu_pitch = rospy.Publisher('/' + self.imu_name +'/pitch', Float32, queue_size=1)
self.pub_imu_yaw = rospy.Publisher('/' + self.imu_name +'/yaw', Float32, queue_size=1)
self.sub = rospy.Subscriber(self.topic_name, Imu, self.process_imu_message, queue_size=1)
rate = rospy.Rate(self.rate)
while not rospy.is_shutdown():
rate.sleep()
def yaw_to_imu(yaw):
# WARNING: we assume zero roll and zero pitch!
q_avg = tf.quaternion_from_euler(0.0, 0.0, yaw);
imu_msg = Imu()
imu_msg.orientation.w = q_avg[3]
imu_msg.orientation.x = q_avg[0]
imu_msg.orientation.y = q_avg[1]
imu_msg.orientation.z = q_avg[2]
pub_imu.publish(imu_msg)
def __init__(self):
self.lock = threading.Lock()
self.sub_imu = rospy.Subscriber('imu', Imu, self.imu_cb)
self.last_imu_angle = 0
self.imu_angle = 0
while not rospy.is_shutdown():
rospy.sleep(0.3)
rospy.loginfo("imu_angle:"+str((self.imu_angle)*180/3.1415926))
def __init__(self):
self.lock = threading.Lock()
self.sub_imu = rospy.Subscriber('imu', Imu, self.imu_cb)
self.last_imu_angle = 0
self.imu_angle = 0
while not rospy.is_shutdown():
self.last_imu_angle = self.imu_angle
rospy.sleep(10)
rospy.loginfo("imu_drift:"+str((self.imu_angle-self.last_imu_angle)*180/3.1415926))
def __init__(self):
self.pub = rospy.Publisher(cmd_vel,Twist,queue_size =1)
self.subscriber = rospy.Subscriber(Imu_topic,Imu,self.callback)
saveAccEff.py 文件源码
项目:cu-perception-manipulation-stack
作者: correlllab
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def subscribe(self):
self.subscribeAcc = rospy.Subscriber("/robot/accelerometer/left_accelerometer/state",Imu, self.callback)
self.subscribeEff = rospy.Subscriber("/robot/joint_states", JointState, self.callback2)
rospy.spin()
def handle_acc(self, msg):
# Check header of Imu msg
t = msg.header.stamp.secs
acc = (msg.linear_acceleration.x,
msg.linear_acceleration.y,
msg.linear_acceleration.z)
# TODO check how uniform t is, if not investigate correct
# filtering option
self.acc_t.append(t)
self.acc.append(acc)
def proc_emg(emg, moving):
# create an array of ints for emg data
# and moving data
emgPub.publish(EmgArray(emg, moving))
# Package the IMU data into an Imu message
def __init__(self, use_acc_only):
self._init_obs()
config = copy.deepcopy(HYPERPARAMS)
self._hyperparams = config
self._use_acc_only = use_acc_only
if self._hyperparams['constraint']:
import superball_kinematic_tool as skt
self._constraint = skt.KinematicMotorConstraints(
self._hyperparams['constraint_file'], **self._hyperparams['constraint_params']
)
rospy.init_node('superball_agent', disable_signals=True, log_level=rospy.DEBUG)
self._obs_update_lock = threading.Lock()
self._motor_pos_sub = rospy.Subscriber(
'/ranging_data_matlab', Float32MultiArray,
callback=self._motor_pos_cb, queue_size=1
)
self._imu_sub = []
for i in range(12):
self._imu_sub.append(
rospy.Subscriber(
SUPERBALL_IMU_TOPICS[i], Imu,
callback=self._imu_cb, queue_size=1
)
)
self._ctrl_pub = rospy.Publisher('/superball/control', String, queue_size=1)
self._timestep_pub = rospy.Publisher('/superball/timestep', UInt16, queue_size=1)
self._init_motor_pubs()
self._run_sim = False
self._sim_thread = threading.Thread(target=self._continue_simulation)
self._sim_thread.daemon = True
self._sim_thread.start()
self._action_rate = rospy.Rate(10)
def __init__(self):
self.rate = rospy.get_param('~rate', 100.0)
self.topic_name_angle = rospy.get_param('~topic_name_angle', 'topic_name_angle')
self.topic_name_reference = rospy.get_param('~topic_name_reference', 'topic_name_reference')
self.imu_name = rospy.get_param('~imu_name', 'imu_steer')
self.roll_steer = 0.0
self.pitch_steer = 0.0
self.yaw_steer = 0.0
self.roll_frame = 0.0
self.pitch_frame = 0.0
self.yaw_frame = 0.0
self.enable_reference = False
self.enable_angle = False
self.pub_imu_roll_msg = Float32()
self.pub_imu_pitch_msg = Float32()
self.pub_imu_yaw_msg = Float32()
self.pub_imu_roll = rospy.Publisher('/' + self.imu_name +'/roll', Float32, queue_size=1)
self.pub_imu_pitch = rospy.Publisher('/' + self.imu_name +'/pitch', Float32, queue_size=1)
self.pub_imu_yaw = rospy.Publisher('/' + self.imu_name +'/yaw', Float32, queue_size=1)
self.sub = rospy.Subscriber(self.topic_name_angle, Imu, self.process_imu_message_angle, queue_size=1)
self.sub = rospy.Subscriber(self.topic_name_reference, Imu, self.process_imu_message_reference, queue_size=1)
rate = rospy.Rate(self.rate)
while not rospy.is_shutdown():
rate.sleep()
def pubeuler():
rospy.init_node('pub_euler')
pubimu = rospy.Publisher('euler_imu', PoseStamped,queue_size = 10)
pubodom = rospy.Publisher('euler_odom', PoseStamped,queue_size = 10)
pubfiltered = rospy.Publisher('euler_filtered', PoseStamped,queue_size = 10)
pubmeas = rospy.Publisher('euler_meas',PoseStamped,queue_size = 10)
rospy.Subscriber('/imu',Imu,callbackimu, callback_args=pubimu)
rospy.Subscriber('/odom',Odometry,callbackodom, callback_args=pubodom)
rospy.Subscriber('/odometry/filtered',Odometry,callbackfiltered, callback_args=pubfiltered)
rospy.Subscriber('/meas_pose',Odometry,callbackmeas,callback_args=pubmeas)
rospy.spin()
def magnetic_field_callback(self, magnetometer_msg):
# Correct magnetic filed
raw_mag = np.array([magnetometer_msg.vector.x,
magnetometer_msg.vector.y,
magnetometer_msg.vector.z])
# corrected_mag = compensation * (raw_mag - offset)
corrected_mag = np.dot(self._calibration_compensation,
raw_mag - self._calibration_offset)
# compute yaw angle using corrected magnetometer measurements
# and ASSUMING ZERO pitch and roll of the magnetic sensor!
# adapted from
# https://github.com/KristofRobot/razor_imu_9dof/blob/indigo-devel/src/Razor_AHRS/Compass.ino
corrected_mag = corrected_mag / np.linalg.norm(corrected_mag)
mag_bearing = math.atan2(corrected_mag[1], -corrected_mag[0])
# add declination and constant bearing offset
mag_bearing = mag_bearing + self._declination + self._bearing_offset
# publish unfiltered bearing, degrees only for debug purposes
self._pub_bearing_raw.publish(Float64(math.degrees(mag_bearing)))
# compute mean
self._latest_bearings[self._num_magnetometer_reads] = mag_bearing
self._num_magnetometer_reads += 1
if self._num_magnetometer_reads >= self._number_samples_average:
self._num_magnetometer_reads = 0 # delete oldest samples
self._received_enough_samples = True
if self._received_enough_samples:
bearing_avg = self.angular_mean(self._latest_bearings)
else:
# not enough samples, use latest value
bearing_avg = mag_bearing
# WARNING: we assume zero roll and zero pitch!
q_avg = tf.quaternion_from_euler(0.0, 0.0, bearing_avg);
imu_msg = Imu()
imu_msg.orientation.w = q_avg[3]
imu_msg.orientation.x = q_avg[0]
imu_msg.orientation.y = q_avg[1]
imu_msg.orientation.z = q_avg[2]
self._pub_bearing_avg.publish(Float64(math.degrees(bearing_avg)))
self._pub_imu_bearing_avg.publish(imu_msg)
# debug
if self._verbose:
rospy.loginfo("bearing_avg : " +
str(math.degrees(bearing_avg)) + " deg")
mag_corrected_msg = magnetometer_msg
mag_corrected_msg.vector.x = corrected_mag[0]
mag_corrected_msg.vector.y = corrected_mag[1]
mag_corrected_msg.vector.z = corrected_mag[2]
self._pub_mag_corrected.publish(mag_corrected_msg)
def __init__(self):
self.recording = False
self.names = ('sair', 'sail', 'fai', 'faii')
self.data = {n: [] for n in self.names}
# TODO hardcoded for left arm
# self.acc_sub = rospy.Subscriber(
# '/robot/accelerometer/left_accelerometer/state',
# Imu,
# self.handle_acc)
self.sensor_sub = rospy.Subscriber(
'/sensor_values',
Int32MultiArray,
self.handle_sensor)
# self.kb_sub = rospy.Subscriber('/keyboard/keyup',
# Key,
# self.keyboard_cb, queue_size=10)
# Visualising the below pulished signals in rqt_plot is recommended
self.sai_pub = rospy.Publisher(
'/finger_sensor/sai',
FingerSAI,
queue_size=5)
self.fai_pub = rospy.Publisher(
'/finger_sensor/fai',
FingerFAI,
queue_size=5)
self.faii_pub = rospy.Publisher(
'/finger_sensor/faii',
Float64,
queue_size=5)
self.sensor_values = []
self.sensor_values = deque(maxlen=80)
self.acc_t = deque(maxlen=400)
self.acc = deque(maxlen=400)
# 0.66pi rad/sample (cutoff frequency over nyquist frequency
# (ie, half the sampling frequency)). For the wrist, 33 Hz /
# (100 Hz/ 2). TODO Double check arguments
self.b1, self.a1 = signal.butter(1, 0.66, 'high', analog=False)
# 0.5p rad/sample. For the tactile sensor, 5 Hz / (20 Hz / 2).
self.b, self.a = signal.butter(1, 0.5, 'high', analog=False)
def proc_imu(quat1, acc, gyro):
# New info:
# https://github.com/thalmiclabs/myo-bluetooth/blob/master/myohw.h#L292-L295
# Scale values for unpacking IMU data
# define MYOHW_ORIENTATION_SCALE 16384.0f
# See myohw_imu_data_t::orientation
# define MYOHW_ACCELEROMETER_SCALE 2048.0f
# See myohw_imu_data_t::accelerometer
# define MYOHW_GYROSCOPE_SCALE 16.0f
# See myohw_imu_data_t::gyroscope
if not any(quat1):
# If it's all 0's means we got no data, don't do anything
return
h = Header()
h.stamp = rospy.Time.now()
# frame_id is node name without /
h.frame_id = rospy.get_name()[1:]
# We currently don't know the covariance of the sensors with each other
cov = [0, 0, 0, 0, 0, 0, 0, 0, 0]
quat = Quaternion(quat1[0] / 16384.0,
quat1[1] / 16384.0,
quat1[2] / 16384.0,
quat1[3] / 16384.0)
# Normalize the quaternion and accelerometer values
quatNorm = sqrt(quat.x * quat.x + quat.y *
quat.y + quat.z * quat.z + quat.w * quat.w)
normQuat = Quaternion(quat.x / quatNorm,
quat.y / quatNorm,
quat.z / quatNorm,
quat.w / quatNorm)
normAcc = Vector3(acc[0] / 2048.0,
acc[1] / 2048.0,
acc[2] / 2048.0)
normGyro = Vector3(gyro[0] / 16.0, gyro[1] / 16.0, gyro[2] / 16.0)
imu = Imu(h, normQuat, cov, normGyro, cov, normAcc, cov)
imuPub.publish(imu)
roll, pitch, yaw = euler_from_quaternion([normQuat.x,
normQuat.y,
normQuat.z,
normQuat.w])
oriPub.publish(Vector3(roll, pitch, yaw))
oriDegPub.publish(Vector3(degrees(roll), degrees(pitch), degrees(yaw)))
posePub.publish( PoseStamped(h,Pose(Point(0.0,0.0,0.0),normQuat)) )
# Package the arm and x-axis direction into an Arm message
def __init__(self):
self.degrees2rad = math.pi/180.0
# Get values from launch file
self.rate = rospy.get_param('~rate', 80.0) # the rate at which to publish the transform
# Static transform between sensor and fixed frame: x, y, z, roll, pitch, yaw
# <rosparam param="static_transform">[0, 0, 0, 0, 0, 0]</rosparam>
self.static_transform = rospy.get_param('~static_transform', [0, 0, 0, 0, 0, 0])
self.serial_port = rospy.get_param('~serial_port', "/dev/ttyUSB0")
self.topic_name = rospy.get_param('~topic_name', "/imu")
self.fixed_frame = rospy.get_param('~fixed_frame', "world")
self.frame_name = rospy.get_param('~frame_name', "imu")
self.publish_transform = rospy.get_param('~publish_transform', False)
self.imu = ImuDriver(serial_port=self.serial_port)
self.imu.init_imu()
# Create a publisher for imu message
self.pub_imu = rospy.Publisher(self.topic_name, Imu, queue_size=1)
self.odomBroadcaster_imu = TransformBroadcaster()
self.imu_msg = Imu()
self.imu_msg.orientation_covariance[0] = -1
self.imu_msg.angular_velocity_covariance[0] = -1
self.imu_msg.linear_acceleration_covariance[0] = -1
self.current_time = rospy.get_time()
self.last_time = rospy.get_time()
self.seq = 0
rospy.on_shutdown(self.shutdown_node)
rate = rospy.Rate(self.rate)
rospy.loginfo("Ready for publishing imu:" + self.serial_port)
# Main while loop.
while not rospy.is_shutdown():
self.current_time = rospy.get_time()
self.imu.read()
if self.publish_transform:
quaternion = self.imu.quaternion_from_euler(self.static_transform[3]*self.degrees2rad,
self.static_transform[4]*self.degrees2rad,
self.static_transform[5]*self.degrees2rad)
# send static transformation tf between imu and fixed frame
self.odomBroadcaster_imu.sendTransform(
(self.static_transform[0], self.static_transform[1], self.static_transform[2]),
(quaternion[0], quaternion[1], quaternion[2], quaternion[3]),
rospy.Time.now(), self.frame_name, self.fixed_frame
)
# publish imu message
self.publish_info(imu=self.imu)
rate.sleep()