def open_port(port=15441, desc="UpnpPunch"):
"""
Attempt to forward a port using UPnP.
"""
local_ips = [_get_local_ip()]
try:
local_ips += socket.gethostbyname_ex('')[2] # Get ip by '' hostname not supported on all platform
except:
pass
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(('8.8.8.8', 0)) # Using google dns route
local_ips.append(s.getsockname()[0])
except:
pass
local_ips = list(set(local_ips)) # Delete duplicates
logging.debug("Found local ips: %s" % local_ips)
local_ips = local_ips * 3 # Retry every ip 3 times
for local_ip in local_ips:
logging.debug("Trying using local ip: %s" % local_ip)
idg_response = _m_search_ssdp(local_ip)
if not idg_response:
logging.debug("No IGD response")
continue
location = _retrieve_location_from_ssdp(idg_response)
if not location:
logging.debug("No location")
continue
parsed = _parse_igd_profile(
_retrieve_igd_profile(location)
)
if not parsed:
logging.debug("IGD parse error using location %s" % repr(location))
continue
control_url, upnp_schema = parsed
soap_messages = [_create_soap_message(local_ip, port, desc, proto, upnp_schema)
for proto in ['TCP', 'UDP']]
requests = [gevent.spawn(
_send_soap_request, location, upnp_schema, control_url, message
) for message in soap_messages]
gevent.joinall(requests, timeout=3)
if all([request.value for request in requests]):
return True
return False
评论列表
文章目录