def numersum(test_median, data_point):
"""Provides the denominator of the weiszfeld algorithm depending on whether you are adjusting the candidate x or y."""
try:
return 1 / vincenty(test_median, data_point).kilometers
except ZeroDivisionError:
return 0 # filter points that equal the median out (otherwise no convergence)
except ValueError:
# Vincenty doesn't always converge so fall back on great circle distance which is less accurate but always converges
return 1 / great_circle(test_median, data_point).kilometers
python类vincenty()的实例源码
def objfunc(test_median, data_points):
"""This function calculates the sum of linear distances from the current candidate median to all points
in the data set, as such it is the objective function that we are minimising.
"""
temp = 0.0
for i in range(0, len(data_points)):
try:
temp += vincenty(test_median, data_points[i]).kilometers
except ValueError:
# Vincenty doesn't always converge so fall back on great circle distance which is less accurate but always converges
temp += great_circle(test_median, data_points[i]).kilometers
return temp
def MotherShipDistance():
home=(latitude,longitude)
fremont_ca = (37.5483, -121.9886)
distance=int(vincenty(fremont_ca, home).miles)
text="Your car is currently %d %s from the Tesla factory in Fremont California" % (distance*distscale,distunits)
return statement(text)
# INTENTS THAT SEND COMMANDS TO THE CAR
# "Unlock my car for 10 minutes."
def start_recording(self):
print("inside gps")
self.port.write(b"$PMTK397,0*23F\r\n")
self.port.write(b"$PMTK397,0.2*3F\r\n")
self.port.write(b"$PMTK220,100*2F\r\n")
while self.shutdown:
fd = self.port.readline()
if fd.startswith(b'$GPRMC'):
# print("*******", fd)
result = fd.split(b',')
if result[2] == b'A':
now = time.time()
self.activated = True
try:
lat = int(result[3][:2]) + float(result[3][2:].decode('ascii',errors='ignore'))/60
lon = int(result[5][:3]) + float(result[5][3:].decode('ascii',errors='ignore')) / 60
# if self.long != lon or self.lat != lat:
# dist = vincenty((self.lat, self.long),(lat,lon)).meters
# speed = dist/ (now - self.last_time)
# print("speed from latlong", speed)
# for cb in self.callbacks:
# cb(speed)
self.lat, self.long,self.last_time = lat, lon, now
except Exception as oops:
print(oops)
else:
self.activated = False
# now = time.time()
# dist = vincenty((self.lat, self.long),(lat,lon)).meters / (current[3] - self.latlong[3])
# current[3] = velocity
# self.latlong = current
# print("latlongg..", self.lat, self.long)
if fd.startswith(b'$GPVTG'):
if self.activated:
print(fd)
result = fd.split(b',')
try:
gps_speed = float(result[7].decode('ascii',errors='ignore'))
print("speed from gps", gps_speed)
for cb in self.callbacks:
cb(gps_speed)
except Exception as oops:
print(oops)
else:
# print("ignoring..", fd)
pass
yield from asyncio.sleep(0.1)
def __init__(self, uuid, geojson, search_preset, user_coordinates, timezone_name):
self.uuid = uuid
if geojson['geometry']['type'] == "Point":
self.osm_meta = ("node", geojson["id"])
self.coordinates = (
geojson["geometry"]["coordinates"][1],
geojson["geometry"]["coordinates"][0]
)
else: # Should be "LineString".
self.osm_meta = ("way", geojson["id"])
self.coordinates = (
geojson["geometry"]["coordinates"][0][1],
geojson["geometry"]["coordinates"][0][0]
)
self.user_coords = user_coordinates
self.search_preset = search_preset
self.properties = geojson["properties"]
self.default_address = self.get_default_address()
self.string_address = ''
self.distance = round(distance.vincenty(
(user_coordinates[0], user_coordinates[1]), self.coordinates
).m)
self.bearing = get_bearing(user_coordinates, self.coordinates)
self.direction = deg2dir(self.bearing)
oh_field = self.properties.get("opening_hours")
self.opening_hours = None
lang = get_language().split('-')[0]
if lang not in ["fr", "en"]:
lang = "en"
if oh_field:
try:
self.opening_hours = humanized_opening_hours.HumanizedOpeningHours(
oh_field, lang, tz=pytz.timezone(timezone_name)
)
except humanized_opening_hours.HOHError:
# TODO : Warn user ?
debug_logger.error(
"Opening hours - HOHError ; OSM_ID: '{id}' ; opening_hours: '{oh}'".format(
id=self.osm_meta[1],
oh=oh_field
)
)
self.opening_hours = None
except Exception as e:
# TODO : Warn user ?
debug_logger.error(
"Opening hours - Error ; Exception: '{exception}' ; OSM_ID: '{id}' ; opening_hours: '{oh}'".format(
exception=str(e),
id=self.osm_meta[1],
oh=oh_field
)
)
self.opening_hours = None
self.tags = self.get_tags()
self.renderable_tags = [t[0] for t in self.tags]
return
def compare_coordinates(cls, coords1, coords2, *args, tolerance=None,
unit='km', **kwargs):
"""
Check if a distance between the pairs of coordinates provided is
within the specified tolerance.
Return True if yes, otherwise return False.
Use geopy (https://pypi.python.org/pypi/geopy).
Try to use Vincenty formula, if error occurs use Great Circle formula.
:param coords1: pair of coordinates - a tuple of two numbers
:param coords2: pair of coordinates - a tuple of two numbers
:param tolerance: number
:param unit: str, one of: 'kilometers', 'km', 'meters',
'm', 'miles', 'mi', 'feet',
'ft', 'nautical', 'nm'
:rtype: bool
:Example:
>>> a, b = (36.1332600, -5.4505100), (35.8893300, -5.3197900)
>>> MatchBlock.compare_coordinates(a, b, tolerance=20, unit='mi')
True
>>> a, b = (36.1332600, -5.4505100), (35.8893300, -5.3197900)
>>> MatchBlock.compare_coordinates(a, b, tolerance=1, unit='mi')
False
"""
if tolerance is None:
tolerance = cls.coordinates_tolerance
units = {'kilometers', 'km', 'meters', 'm', 'miles', 'mi',
'feet', 'ft', 'nautical', 'nm'}
unit = unit.strip().lower()
if unit not in units:
raise ValueError('unsupported unit')
try:
length = getattr(vincenty(coords1, coords2, *args, **kwargs), unit)
except ValueError as e:
if 'Vincenty formula failed to converge!' in e.args:
warnings.warn(
'vincenty formula failed, using great circle formula')
length = getattr(great_circle(coords1, coords2, *args), unit)
else:
raise
return length <= tolerance
def _connector_edges(osm_nodes, transit_nodes, travel_speed_mph=3):
"""
Generate the connector edges between the osm and transit edges and
weight by travel time
Parameters
----------
osm_nodes : pandas.DataFrame
osm nodes DataFrame
transit_nodes : pandas.DataFrame
transit nodes DataFrame
travel_speed_mph : int, optional
travel speed to use to calculate travel time across a
distance on a edge. units are in miles per hour (MPH)
for pedestrian travel this is assumed to be 3 MPH
Returns
-------
net_connector_edges : pandas.DataFrame
"""
start_time = time.time()
transit_nodes['nearest_osm_node'] = _nearest_neighbor(
osm_nodes[['x', 'y']],
transit_nodes[['x', 'y']])
net_connector_edges = []
for transit_node_id, row in transit_nodes.iterrows():
# create new edge between the node in df2 (transit)
# and the node in openstreetmap (pedestrian)
osm_node_id = int(row['nearest_osm_node'])
osm_row = osm_nodes.loc[osm_node_id]
distance = vincenty((row['y'], row['x']),
(osm_row['y'], osm_row['x'])).miles
time_ped_to_transit = distance / travel_speed_mph * 60
time_transit_to_ped = distance / travel_speed_mph * 60
# save the edge
net_type = 'transit to osm'
net_connector_edges.append((transit_node_id, osm_node_id,
time_transit_to_ped, net_type))
# make the edge bi-directional
net_type = 'osm to transit'
net_connector_edges.append((osm_node_id, transit_node_id,
time_ped_to_transit, net_type))
net_connector_edges = pd.DataFrame(net_connector_edges,
columns=["from", "to",
"weight", "net_type"])
log(
'Connector edges between the OSM and transit network nodes '
'successfully completed. Took {:,.2f} seconds'.format(
time.time() - start_time))
return net_connector_edges
def factor_polygon_into_circles(polygon, radius_km):
"""
Takes a GEOSGeomtery Polygon and a radius in kilometers. Returns a list
of (lng, lat) tuples representing the centers of circles of the specified
radius that together cover the area of the Polygon.
"""
safety_buffer = 0.01 # small safety buffer for overlap
if not isinstance(polygon, Polygon):
raise TypeError('object is not a Polygon')
if radius_km <= safety_buffer:
raise ValueError('radius must be greater than safety buffer')
# get the bounds of the polygon with a small safety buffer
buffer_dist_m = (radius_km * 1000) * safety_buffer
bounds = create_buffered_bounds(polygon, buffer_dist_m)
# get the space between circle centers, with a safety margin of 10 meters
dist_bw_centers_m = calculate_circle_spacing(radius_km * 1000, overlap_m=10)
# create a coordinate calculator for the increment distance
calculator = vincenty(meters=dist_bw_centers_m)
points = [] # array for collecting the circle centers
# position first point so the circle intersects with the sw corner of bounds
starting_pt = vincenty(kilometers=radius_km).destination(
point=(bounds.s_lat, bounds.w_lng),
bearing=45
)
# get the starting latitude
lat = starting_pt.latitude
# find the number of rows of circles needed to span the height of the polygon
rows = int(math.ceil(bounds.height_m / dist_bw_centers_m))
for dummy_row_idx in range(rows):
# reset the starting longitude before each west-to-east loop
lng = starting_pt.longitude
# get the distance between w_lng and e_lng at the current latitude
width_m = bounds.get_width_at_latitude_m(lat)
# find the number of columns of circles needed to span the width
cols = int(math.ceil(width_m / dist_bw_centers_m))
for dummy_col_idx in range(cols):
# add current coordinates to point array
points.append((lng, lat))
# calculate next point to the east and increment longitude
lng = calculator.destination(point=(lat, lng), bearing=90).longitude
# calculate next point to the north and increment latitude
lat = calculator.destination(point=(lat, lng), bearing=0).latitude
return points
def compute_user_median(data_points, num_iter, csvwriter, current_uid):
if len(data_points) < LIMIT_POINTS: # Insufficient points for the user - don't record median
if OUTPUT_ALL_USERS:
csvwriter.writerow([current_uid, None])
else:
if SNAP_TO_USER_POINTS: # ensure median is one of the user's points
lowest_dev = float("inf")
for point in data_points:
tmp_abs_dev = objfunc(point, data_points)
if tmp_abs_dev < lowest_dev:
lowest_dev = tmp_abs_dev
test_median = point
else:
test_median = cand_median(data_points) # Calculate centroid more or less as starting point
if objfunc(test_median, data_points) != 0: # points aren't all the same
# iterate to find reasonable estimate of median
for x in range(0, num_iter):
denom = denomsum(test_median, data_points)
next_lat = 0.0
next_lon = 0.0
for y in range(0, len(data_points)):
next_lat += (data_points[y][0] * numersum(test_median, data_points[y])) / denom
next_lon += (data_points[y][1] * numersum(test_median, data_points[y])) / denom
prev_median = test_median
test_median = (next_lat, next_lon)
try:
if vincenty(prev_median, test_median).meters < DISTANCE_THRESHOLD:
break
except:
if great_circle(prev_median, test_median).meters < DISTANCE_THRESHOLD:
break
if x == num_iter - 1:
print('{0}: failed to converge. Last change between iterations was {1} meters.'.format(current_uid, great_circle(prev_median, test_median).meters))
# Check if user points are under the limit median absolute deviation
if check_median_absolute_deviation(data_points, test_median) <= LIMIT_MAD:
csvwriter.writerow([current_uid, (round(test_median[0],6), round(test_median[1],6))])
else:
if OUTPUT_ALL_USERS:
csvwriter.writerow([current_uid, None])