UpnpPunch.py 文件源码

python
阅读 28 收藏 0 点赞 0 评论 0

项目:zeronet-debian 作者: bashrc 项目源码 文件源码
def open_port(port=15441, desc="UpnpPunch"):
    """
    Attempt to forward a port using UPnP.
    """

    local_ips = [_get_local_ip()]
    try:
        local_ips += socket.gethostbyname_ex('')[2]  # Get ip by '' hostname not supported on all platform
    except:
        pass

    try:
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        s.connect(('8.8.8.8', 0))  # Using google dns route
        local_ips.append(s.getsockname()[0])
    except:
        pass

    local_ips = list(set(local_ips))  # Delete duplicates
    logging.debug("Found local ips: %s" % local_ips)
    local_ips = local_ips * 3  # Retry every ip 3 times

    for local_ip in local_ips:
        logging.debug("Trying using local ip: %s" % local_ip)
        idg_response = _m_search_ssdp(local_ip)

        if not idg_response:
            logging.debug("No IGD response")
            continue

        location = _retrieve_location_from_ssdp(idg_response)

        if not location:
            logging.debug("No location")
            continue

        parsed = _parse_igd_profile(
            _retrieve_igd_profile(location)
        )

        if not parsed:
            logging.debug("IGD parse error using location %s" % repr(location))
            continue

        control_url, upnp_schema = parsed

        soap_messages = [_create_soap_message(local_ip, port, desc, proto, upnp_schema)
                         for proto in ['TCP', 'UDP']]

        requests = [gevent.spawn(
            _send_soap_request, location, upnp_schema, control_url, message
        ) for message in soap_messages]

        gevent.joinall(requests, timeout=3)

        if all([request.value for request in requests]):
            return True
    return False
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号