def geocode_address(self):
"""
Set the lat_long field, if the appropriate fields are filled
"""
from geopy.geocoders import Nominatim
from geopy.exc import GeopyError
address = self.one_line_address()
if address:
geolocator = Nominatim()
try:
location = geolocator.geocode(address)
self.lat_long = {
"type": "Point",
"coordinates": [location.longitude, location.latitude]
}
except (GeopyError, AttributeError):
pass
return
python类Nominatim()的实例源码
def coords_for(name):
geocoder = Nominatim()
location = geocoder.geocode(name, geometry='geojson')
try:
geometry = shape(location.raw['geojson'])
# Coordinates have to be flipped in order to work in overpass
if geometry.geom_type == 'Polygon':
west, south, east, north = geometry.bounds
return BoundingBox(south, west, north, east)
elif geometry.geom_type == 'MultiPolygon':
bboxs = (BoundingBox(*(g.bounds[0:2][::-1] + g.bounds[2:][::-1]))
for g in geometry)
return bboxs
elif geometry.geom_type == 'Point':
south, north, west, east = (float(coordinate)
for coordinate in
location.raw['boundingbox'])
return BoundingBox(south, west, north, east)
except (KeyError, AttributeError):
raise AttributeError(
'No bounding box available for this location name.')
StatesTweetCount.py 文件源码
项目:LoveIsInTheAir-Stats-Plotter
作者: sahilsareen
项目源码
文件源码
阅读 38
收藏 0
点赞 0
评论 0
def __init__(self, config_file=DEFAULT_CONFIG_FILE):
self.config = ConfigReader(config_file)
self.states = {}
self.geo_locator = Nominatim()
self.tweet_count = 0
self.city_cache_appender = CacheAppender(self.config.cache_file_path)
def get_level():
return {
'DEBUG': logging.DEBUG,
'INFO': logging.INFO,
'WARN': logging.WARNING,
'ERROR': logging.ERROR,
'FATAL': logging.FATAL,
'CRITICAL': logging.CRITICAL
}[self.config.logging_level]
logging.basicConfig(format="[%(levelname)s] %(name)s: %(message)s", level=get_level())
self.logger = logging.getLogger(self.__class__.__name__)
self.logger.info("Analysing city names using config in %s" % config_file)
def location(name, street, city, latitude = None, longitude = None):
group = get_sql(name)
if group is None:
return False
if not latitude and not longitude:
geo = Nominatim()
coord = geo.geocode(street + ' ' + city, timeout = 10)
if coord:
latitude = coord.latitude
longitude = coord.longitude
else:
longitude = 0.0
longitude = 0.0
group.location = LocationModel(street, city, latitude, longitude)
SQL.session.add(group)
SQL.session.commit()
return group
def location(name, street, city, latitude = None, longitude = None):
node = get_sql(name)
if node is None:
return False
if not latitude and not longitude:
geo = Nominatim()
coord = geo.geocode(street + ' ' + city, timeout = 10)
if coord:
latitude = coord.latitude
longitude = coord.longitude
else:
latitude = 0.0
longitude = 0.0
node.location = LocationModel(street, city, latitude, longitude)
SQL.session.add(node)
SQL.session.commit()
return node
def main():
""" Only step 1 is given as solution. """
with open(FILE_NAME, newline='') as csv_file:
geo_locator = Nominatim()
csv_reader = csv.reader(csv_file, delimiter=DELIMITER)
for record in csv_reader:
# is this a better way of unpacking you can think of?
# Perhaps using a better data structure than just a tuple?
policy_id, *_, point_latitude, point_longitude, line, construction, point_granularity = record
# print(policy_id, point_latitude, point_longitude)
try:
print(geo_locator.reverse(point_latitude + ', ' + point_longitude))
except Exception as e:
print(e)
print("Unable to get reverse coordinates")
return 0
def mapsearch(cmd, message, args):
if args:
search = ' '.join(args)
search_url = '+'.join(args)
if search:
geo_parser = Nominatim()
location = geo_parser.geocode(search)
if location:
lat = location.latitude
lon = location.longitude
maps_url = f'https://www.google.rs/maps/search/{search_url}/@{lat},{lon},11z?hl=en'
response = discord.Embed(color=0xdd4e40)
response.set_author(name=f'{location}', icon_url=map_icon, url=maps_url)
else:
maps_url = f'https://www.google.rs/maps/search/{search_url}'
response = discord.Embed(color=0xdd4e40)
response.set_author(name=f'Broad Search: {search.title()}', icon_url=map_icon, url=maps_url)
else:
response = discord.Embed(color=0xBE1931, title='? No location inputted.')
else:
response = discord.Embed(color=0xBE1931, title='? Nothing inputted.')
await message.channel.send(embed=response)
def get(self, request, *args, **kwargs):
# Validate request
form = CoordinatesFromAddressForm(request.GET)
if not form.is_valid():
return Response(form.errors, status=status.HTTP_400_BAD_REQUEST)
address = form.cleaned_data['address']
# Return lat/lon for address
geolocator = Nominatim()
try:
location = geolocator.geocode(address)
except GeocoderTimedOut:
return Response(
"Geocoder service currently unavailable. Please try again later.",
status=status.HTTP_503_SERVICE_UNAVAILABLE
)
return Response({
'latitude': float("{0:.4f}".format(location.latitude)),
'longitude': float("{0:.4f}".format(location.longitude)),
})
def __init__(self, *commands, token=None, prefixes=None, strict=False):
"""Answers with a weather in user's city or on specified addres."""
if not token:
raise ValueError("Token is not specified! Get it from: https://darksky.net")
super().__init__(*commands, prefixes=prefixes, strict=strict)
self.token = token
self.icons = {
"clear-day": "??",
"clear-night": "??",
"cloudy": "??",
"fog": "??",
"partly-cloudy-day": "??",
"partly-cloudy-night": "??",
"rain": "??",
"sleet": "?? ??",
"snow": "??",
"wind": "??",
"error": "??",
}
self.geocoders = []
for coder in [Photon, Yandex, Nominatim]:
self.geocoders.append(coder())
self.coords_cache = {}
self.weather_cache = {}
self.weather_clear = time.time() + 12 * 60 * 60
self.api_lim = 95
self.api_lim_clear = time.time() + 24 * 60 * 60
self.api_lim_count = 0
def coordinates(street, city):
geo = Nominatim()
coords = geo.geocode(street + ' ' + city)
if coords:
emit('coordinates', (coords.latitude, coords.longitude))
else:
emit('error', "Coordinates not found", namespace='/general')
def coordinates(street, city):
geo = Nominatim()
geocords = geo.geocode(street + ' ' + city)
if geocords:
emit('coordinates', (geocords.latitude, geocords.longitude))
else:
emit("warning", "Coordinated no found", namespace='/general')
return True
def main():
geolocator = Nominatim()
intro = True
again = True
print("This app will generate a crime heat map based on your location")
while intro:
introText = input("Would you like to enter a location (y/n) ")
if introText.lower() == "y" or introText.lower() == "n":
locationInfo = validLocation(geolocator, introText)
intro = False
raw_data = apiResults(locationInfo)
mapMaker(raw_data, locationInfo)
output(raw_data, locationInfo)
while again:
againText = input("Would you like to run the application again (y/n) ")
if againText.lower() == "y" or againText.lower() == "n":
again = False
if againText.lower() == "y":
main()
else:
print("Thanks! See you next time")
SystemExit
def _geolocate_nominatim(self, string):
g = Nominatim(timeout=30)
results = g.geocode(query=string, exactly_one=True)
if results and results[0]:
self.add_potential_georeference(long=results.longitude,
lat=results.latitude,
profile_name='OpenStreetMap',
locality_name=results.address)
def onChangeLoc(self, event):
'''
Change location
'''
loc_set = False
while not loc_set:
dlg = wx.TextEntryDialog(self.frame, 'Please enter a location', 'Current location: ' + self.location)
if dlg.ShowModal() == wx.ID_OK:
# do something here
loc = str(dlg.GetValue())
else:
# handle dialog being cancelled or ended by some other button
loc = None
dlg.Destroy()
geolocator = Nominatim()
# Look up location given
try:
l = geolocator.geocode(loc, exactly_one=True)
self.latlon = (l.latitude, l.longitude)
self.location = loc
loc_set = True
except Exception as e:
print('Error setting location\n' + str(e))
self.session.update_location(self.latlon[0], self.latlon[1])
def _get_default_pool_members():
from geopy.geocoders import Nominatim, GoogleV3
return (Nominatim(), GoogleV3())
def __init__(self):
self._geocoder = geocoders.Nominatim(timeout=5, country_bias='fr')
Google_Place_Nearby_Search.py 文件源码
项目:Hanhan_Play_With_Social_Media
作者: hanhanwu
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def calculate_distance(merchant_loc, user_loc):
geolocator = Nominatim()
merchant_lat_lng = [Decimal(l) for l in merchant_loc.split(',')]
al1 = (merchant_lat_lng[0], merchant_lat_lng[1])
location2 = geolocator.geocode(user_loc)
if location2 == None: return None
al2 = (location2.latitude, location2.longitude)
distce = vincenty(al1, al2).miles
return distce
Foursquare_search.py 文件源码
项目:Hanhan_Play_With_Social_Media
作者: hanhanwu
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def get_ll(postal_code):
if postal_code == None: return None
geolocator = Nominatim()
location = geolocator.geocode(postal_code) # it seems that cannot write abbreviation here
if location == None: return None
al = str(location.latitude) + ', ' + str(location.longitude)
return al
def change_location(session, location_name):
geolocator = Nominatim()
location = geolocator.geocode(location_name)
print('** changing location to {0}'.format(location.address))
session.update_location(location.latitude, location.longitude)
print('** location changed')
# make yourself undiscoverable
def geohash(latitude, longitude):
return Geohash.encode(float(latitude), float(longitude))
# Cache responses from Nominatim for 1 month
def __init__(self, source, key):
if source == 'Google':
self.geolocator = GoogleV3(api_key = key)
if source == 'Nominatim':
self.geolocator = Nominatim()
self.latitude = ""
self.longitude = ""
self.city = ""
self.state = ""
self.country = ""
reload(sys)
sys.setdefaultencoding('utf8')
def get_latlon(self, city):
try:
geolocator = Nominatim()
location = geolocator.geocode(city)
latitude, longitude = location.latitude, location.longitude
return latitude, longitude
except:
return -1, -1
def getaddress():
from geopy.geocoders import Nominatim
geolocator = Nominatim()
cord = "%s,%s" % (lat,lng)
location = geolocator.reverse(cord)
return location.address
def dispatch(self, request, *args, **kwargs):
# Filtering
self.active_genre = request.GET.get('genre', 'All Genres')
self.distance = request.GET.get('distance')
self.location = request.GET.get('location')
self.lat = request.GET.get('lat')
self.lon = request.GET.get('lon')
# Sorting
order_by_slug = request.GET.get('sort')
if order_by_slug not in self.ORDER_BY_NAME:
order_by_slug = 'recent'
self.order_by = {
'slug': order_by_slug,
'name': self.ORDER_BY_NAME[order_by_slug],
}
# Geolocate if location
self.location_coordinates = None
self.geocoder_failed = False
if self.location:
geolocator = Nominatim()
try:
self.location_coordinates = geolocator.geocode(self.location)
except GeocoderTimedOut:
self.geocoder_failed = True
return super(ArtistListView, self).dispatch(request, *args, **kwargs)
def validate_location(ctx, param, value):
if value is None:
return value
geolocator = Nominatim()
location = geolocator.geocode(value)
if location is None:
raise click.BadParameter('Location \"%s\" could not be found' % value)
return location
def latlong(opt):
reload, retry = opt['reload'], opt['retry']
map = models.UserPreferences.objects.filter(location__isnull=False).exclude(location__exact='')
if not reload:
map = map.filter(location_changed__exact=True)
geolocator = Nominatim()
for user in map:
getLatLong(geolocator, user, retry)
map = models.UserPreferences.objects.filter(latitude__isnull=False).select_related('user')
mapcache = '{# this file is generated, do not edit it #}{% extends "base.html" %}{% load i18n %}{% load l10n %}{% block title %}{% trans "Map" %}{% endblock %}{% block content %}<div id="map"></div>{% endblock %}{% block afterjs %}{% localize off %}<script>var center=new google.maps.LatLng({% if center %}{{ center.latitude }},{{ center.longitude }}{% else %}30,0{% endif %});var zoom={% if zoom %}{{ zoom }}{% else %}2{% endif %};var addresses = ['
for u in map:
try:
mapcache += '{open}"username": "{username}","avatar": "{avatar}","location": "{location}","icon": "{icon}","latlong": new google.maps.LatLng({latitude},{longitude}){close},'.format(
open='{',
username=escape(u.user.username),
avatar=escape(models.avatar(u.user)),
location=escape(u.location),
icon=escape(u.favorite_character1_image if u.favorite_character1_image else SITE_STATIC_URL + 'static/img/default_map_icon.png'),
latitude=u.latitude,
longitude=u.longitude,
close='}',
)
except:
print 'One user not added in map'
mapcache += '];</script><script src="' + SITE_STATIC_URL + 'static/js/map.js"></script>{% endlocalize %}{% endblock %}'
with open(django_settings.BASE_DIR + '/' + django_settings.SITE + '/templates/pages/map.html', 'w') as f:
f.write(mapcache.encode('UTF-8'))
f.close()
def weather(cmd, message, args):
if 'secret_key' in cmd.cfg:
secret_key = cmd.cfg['secret_key']
if args:
search, unit = get_unit_and_search(args)
if search:
geo_parser = Nominatim()
location = geo_parser.geocode(search)
if location:
lat = location.latitude
lon = location.longitude
req_url = f'https://api.darksky.net/forecast/{secret_key}/{lat},{lon}?units={unit}'
async with aiohttp.ClientSession() as session:
async with session.get(req_url) as data:
search_data = await data.read()
data = json.loads(search_data)
curr = data['currently']
icon = curr['icon']
forecast = data['daily']['summary']
dis, deg = get_dis_and_deg(unit, forecast)
forecast_title = f'{icons[icon]["icon"]} {curr["summary"]}'
response = discord.Embed(color=icons[icon]['color'], title=forecast_title)
response.description = f'Location: {location}'
response.add_field(name='?? Forecast', value=forecast, inline=False)
info_title = f'?? Temperature'
info_text = f'Temperature: {curr["temperature"]}{deg}'
info_text += f'\nFeels Like: {curr["apparentTemperature"]}{deg}'
info_text += f'\nDew Point: {curr["dewPoint"]}{deg}'
response.add_field(name=info_title, value=info_text, inline=True)
wind_title = '?? Wind'
wind_text = f'Speed: {curr["windSpeed"]} {dis}/H'
wind_text += f'\nGust: {curr["windGust"]} {dis}/H'
wind_text += f'\nBearing: {curr["windBearing"]}°'
response.add_field(name=wind_title, value=wind_text, inline=True)
other_title = '?? Other'
other_text = f'Humidity: {curr["humidity"]*100}%'
other_text += f'\nPressure: {curr["pressure"]}mbar'
if 'visibility' in curr:
other_text += f'\nVisibility: {curr["visibility"]} {dis}'
else:
other_text += f'\nVisibility: Unknown'
response.add_field(name=other_title, value=other_text, inline=True)
else:
response = discord.Embed(color=0x696969, title='?? Location not found.')
else:
response = discord.Embed(color=0xBE1931, title='? No location inputted.')
else:
response = discord.Embed(color=0xBE1931, title='? Nothing inputted.')
else:
response = discord.Embed(color=0xBE1931, title='? The API Key is missing.')
await message.channel.send(embed=response)
def weather(cmd, message, args):
if 'secret_key' in cmd.cfg:
secret_key = cmd.cfg['secret_key']
if args:
search, unit = get_unit_and_search(args)
if search:
geo_parser = Nominatim()
location = geo_parser.geocode(search)
if location:
lat = location.latitude
lon = location.longitude
req_url = f'https://api.darksky.net/forecast/{secret_key}/{lat},{lon}?units={unit}'
async with aiohttp.ClientSession() as session:
async with session.get(req_url) as data:
search_data = await data.read()
data = json.loads(search_data)
curr = data['currently']
icon = curr['icon']
forecast = data['daily']['summary']
dis, deg = get_dis_and_deg(unit, forecast)
forecast_title = f'{icons[icon]["icon"]} {curr["summary"]}'
response = discord.Embed(color=icons[icon]['color'], title=forecast_title)
response.description = f'Location: {location}'
response.add_field(name='?? Forecast', value=forecast, inline=False)
info_title = f'?? Temperature'
info_text = f'Temperature: {curr["temperature"]}{deg}'
info_text += f'\nFeels Like: {curr["apparentTemperature"]}{deg}'
info_text += f'\nDew Point: {curr["dewPoint"]}{deg}'
response.add_field(name=info_title, value=info_text, inline=True)
wind_title = '?? Wind'
wind_text = f'Speed: {curr["windSpeed"]} {dis}/H'
wind_text += f'\nGust: {curr["windGust"]} {dis}/H'
wind_text += f'\nBearing: {curr["windBearing"]}°'
response.add_field(name=wind_title, value=wind_text, inline=True)
other_title = '?? Other'
other_text = f'Humidity: {curr["humidity"]*100}%'
other_text += f'\nPressure: {curr["pressure"]}mbar'
if 'visibility' in curr:
other_text += f'\nVisibility: {curr["visibility"]} {dis}'
else:
other_text += f'\nVisibility: Unknown'
response.add_field(name=other_title, value=other_text, inline=True)
else:
response = discord.Embed(color=0x696969, title='?? Location not found.')
else:
response = discord.Embed(color=0xBE1931, title='? No location inputted.')
else:
response = discord.Embed(color=0xBE1931, title='? Nothing inputted.')
else:
response = discord.Embed(color=0xBE1931, title='? The API Key is missing.')
await message.channel.send(embed=response)
def get_air_quality(request):
geolocator = Nominatim()
context = request['context']
entities = request['entities']
loc = first_entity_value(entities, 'location')
for context_key in (
'missingLocation',
'outOfGermany',
'subscribed',
'notSubscribed',
'location',
'station',
'stationId',
'lastAlertValue',
'lastAlertDate',
'kind',
'clean',
'notFound',
):
try:
del context[context_key]
except KeyError:
pass
if not loc:
context['missingLocation'] = True
else:
loc = geolocator.geocode(loc, language='en')
if not loc:
loc = geolocator.geocode('{}, Germany'.format(loc), language='en')
if loc:
closest_station = get_closest_station(loc.latitude, loc.longitude)
# is subscribed ?
if is_subscribed(request['session_id'], closest_station):
context['subscribed'] = True
else:
context['notSubscribed'] = True
# oldest alert we want
max_date = datetime.datetime.now() - datetime.timedelta(days=2)
last_alert = Alert.objects.filter(station=closest_station, report__date__gte=max_date).last()
context['location'] = loc.address
context['station'] = closest_station.name
context['stationId'] = closest_station.pk
cache.set(request['session_id'], closest_station.pk)
if last_alert:
context['lastAlertValue'] = last_alert.value
context['lastAlertDate'] = last_alert.report.date.strftime('%x')
context['kind'] = last_alert.report.kind
else:
context['clean'] = True
else:
context['notFound'] = True
return context
# Setup Actions