def update_telemetry(navsat_msg, compass_msg):
"""Telemetry subscription callback.
Args:
navsat_msg: sensor_msgs/NavSatFix message.
compass_msg: std_msgs/Float64 message in degrees.
"""
try:
client.post_telemetry(navsat_msg, compass_msg)
except (ConnectionError, Timeout) as e:
rospy.logwarn(e)
return
except (ValueError, HTTPError) as e:
rospy.logerr(e)
return
except Exception as e:
rospy.logfatal(e)
return
python类NavSatFix()的实例源码
def test_post_telemetry(self):
"""Tests posting telemetry data through client."""
# Set up test data.
url = "http://interop"
client_args = (url, "testuser", "testpass", 1.0)
with InteroperabilityMockServer(url) as server:
# Setup mock server.
server.set_root_response()
server.set_login_response()
server.set_telemetry_response()
# Connect client.
client = InteroperabilityClient(*client_args)
client.wait_for_server()
client.login()
client.post_telemetry(NavSatFix(), Float64())
def test_telemetry_serializer(self):
"""Tests telemetry serializer."""
# Set up test data.
navsat = NavSatFix()
navsat.latitude = 38.149
navsat.longitude = -76.432
navsat.altitude = 30.48
compass = Float64(90.0)
data = serializers.TelemetrySerializer.from_msg(navsat, compass)
altitude_msl = serializers.meters_to_feet(navsat.altitude)
# Compare.
self.assertEqual(data["latitude"], navsat.latitude)
self.assertEqual(data["longitude"], navsat.longitude)
self.assertEqual(data["altitude_msl"], altitude_msl)
self.assertEqual(data["uas_heading"], compass.data)
def _subscribe(self):
assert self.state_subscriber is None
assert self.position_subscriber is None
self.state_subscriber = rospy.Subscriber(
'%s/mavros/state' % self.namespace, State,
callback=self._state_callback)
self.position_subscriber = rospy.Subscriber(
'%s/mavros/global_position/global' % self.namespace, NavSatFix,
callback=self._position_callback)