python类IPPROTO_IP的实例源码

client.py 文件源码 项目:opendxl-client-python 作者: opendxl 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _socketpair_compat():
    """TCP/IP socketpair including Windows support"""
    listensock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_IP)
    listensock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    listensock.bind(("localhost", 0))
    listensock.listen(1)

    iface, port = listensock.getsockname()
    sock1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_IP)
    sock1.setblocking(0)
    try:
        sock1.connect(("localhost", port))
    except socket.error as err:
        if err.errno != errno.EINPROGRESS and err.errno != errno.EWOULDBLOCK and err.errno != EAGAIN:
            raise
    sock2, address = listensock.accept()
    sock2.setblocking(0)
    listensock.close()
    return (sock1, sock2)
htx.py 文件源码 项目:virtual-hue 作者: flok99 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _startSSDPNotifier(addr):
    msgHeader = [
            'NOTIFY * HTTP/1.1\r\n',
            'HOST: 239.255.255.250:1900\r\n',
            "NTS: ssdp:alive\r\n"
        ]

    msg = ''.join(msgHeader) + gen_ssdp_content(addr, 'NT')

    while True:
        print "Sending M-NOTIFY..."
        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
        sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 32)
        sock.sendto(msg, (ssdp_addr, ssdp_port))
        sock.close()
        del sock

        time.sleep(6)
__init__.py 文件源码 项目:PyXiaomiGateway 作者: Danielhiversen 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _create_mcast_socket(self):
        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

        if self._interface != 'any':
            if platform.system() != "Windows":
                sock.bind((self.MULTICAST_ADDRESS, self.MULTICAST_PORT))
            else:
                sock.bind((self._interface, self.MULTICAST_PORT))

            mreq = socket.inet_aton(self.MULTICAST_ADDRESS) + socket.inet_aton(self._interface)
        else:
            if platform.system() != "Windows":
                sock.bind((self.MULTICAST_ADDRESS, self.MULTICAST_PORT))
            else:
                sock.bind(('', self.MULTICAST_PORT))
            mreq = struct.pack("4sl", socket.inet_aton(self.MULTICAST_ADDRESS), socket.INADDR_ANY)

        sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
        return sock
client.py 文件源码 项目:aws-iot-device-sdk-python 作者: aws 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _socketpair_compat():
    """TCP/IP socketpair including Windows support"""
    listensock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_IP)
    listensock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    listensock.bind(("127.0.0.1", 0))
    listensock.listen(1)

    iface, port = listensock.getsockname()
    sock1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_IP)
    sock1.setblocking(0)
    try:
        sock1.connect(("127.0.0.1", port))
    except socket.error as err:
        if err.errno != errno.EINPROGRESS and err.errno != errno.EWOULDBLOCK and err.errno != EAGAIN:
            raise
    sock2, address = listensock.accept()
    sock2.setblocking(0)
    listensock.close()
    return (sock1, sock2)
asyncUdp.py 文件源码 项目:pyserver 作者: juhgiyo 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def check_mtu_size(self, hostname, port):
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        s.connect(hostname, port)
        s.setsockopt(socket.IPPROTO_IP, IP_MTU_DISCOVER, IP_PMTUDISC_DO)

        max_mtu = self.MAX_MTU
        try:
            s.send('#' * max_mtu)
        except socket.error:
            option = getattr(socket.IPPROTO_IP, 'IP_MTU', 14)
            max_mtu = s.getsockopt(socket.IPPROTO_IP, option)
        return max_mtu

# Echo udp server test
# def readHandle(sock,addr, data):
#   sock.send(addr[0],addr[1],data)
# server=AsyncUDP(5005,readHandle)
discovery.py 文件源码 项目:pyvizio 作者: vkorn 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def discover(service, timeout=5, retries=1, mx=3):
    group = ("239.255.255.250", 1900)
    message = "\r\n".join([
        'M-SEARCH * HTTP/1.1',
        'HOST: {0}:{1}',
        'MAN: "ssdp:discover"',
        'ST: {st}', 'MX: {mx}', '', ''])
    socket.setdefaulttimeout(timeout)
    responses = {}
    for _ in range(retries):
        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
        message_bytes = message.format(*group, st=service, mx=mx).encode('utf-8')
        sock.sendto(message_bytes, group)
        while True:
            try:
                response = SSDPResponse(sock.recv(1024))
                responses[response.location] = response
            except socket.timeout:
                break
        return list(responses.values())
optirx.py 文件源码 项目:hmv-s16 作者: cmuphyscomp 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def mkdatasock(ip_address=None, multicast_address=MULTICAST_ADDRESS, port=PORT_DATA):
    "Create a data socket."
    ip_address = gethostip() if not ip_address else ip_address
    datasock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
    datasock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    datasock.bind((ip_address, port))
    # join a multicast group
    mreq = struct.pack("=4sl", socket.inet_aton(multicast_address), socket.INADDR_ANY)
    datasock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
    datasock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, SOCKET_BUFSIZE)
    return datasock
send_mocap_packets.py 文件源码 项目:hmv-s16 作者: cmuphyscomp 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def make_data_sender_socket(ip_address=None):
    """Create a socket for sending multicast data."""
    ip_address = gethostip() if not ip_address else ip_address
    datasock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
    datasock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
    datasock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    datasock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 20)

    # print("Binding source socket to %s" % ip_address)
    datasock.bind((ip_address, 0))
    return datasock
twampy.py 文件源码 项目:twampy 作者: nokia 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def connect(self, server="", port=862, tos=0x88):
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.socket.setsockopt(socket.IPPROTO_IP, socket.IP_TOS, tos)
        self.socket.connect((server, port))
udp.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def getOutgoingInterface(self):
        i = self.socket.getsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF)
        return socket.inet_ntoa(struct.pack("@i", i))
udp.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _setInterface(self, addr):
        i = socket.inet_aton(addr)
        self.socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF, i)
        return 1
udp.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def getLoopbackMode(self):
        return self.socket.getsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_LOOP)
udp.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def setLoopbackMode(self, mode):
        mode = struct.pack("b", operator.truth(mode))
        self.socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_LOOP, mode)
udp.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def getTTL(self):
        return self.socket.getsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL)
udp.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 47 收藏 0 点赞 0 评论 0
def _joinAddr2(self, interface, addr, join):
        addr = socket.inet_aton(addr)
        interface = socket.inet_aton(interface)
        if join:
            cmd = socket.IP_ADD_MEMBERSHIP
        else:
            cmd = socket.IP_DROP_MEMBERSHIP
        try:
            self.socket.setsockopt(socket.IPPROTO_IP, cmd, addr + interface)
        except socket.error, e:
            return failure.Failure(error.MulticastJoinError(addr, interface, *e.args))
protocol.py 文件源码 项目:pyaqara 作者: javefang 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _add_membership(self):
        """private: add multicast membership"""
        _LOGGER.debug("Joining multicast group...")
        sock = self.transport.get_extra_info("socket")
        group = socket.inet_aton(MCAST_ADDR)
        mreq = struct.pack("4sL", group, socket.INADDR_ANY)
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        _LOGGER.debug("Multicast membership added")
listener.py 文件源码 项目:pyNet 作者: mcvavy 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def __init__(self, thisNode):
        logging.info('Initializing listener')
        self.thisNode = thisNode
        self.MCAST_PORT = 4242
        self.MCAST_GRP = '192.168.1.255'

        self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
        self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

        self.sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
        self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
        self.sock.bind(('', self.MCAST_PORT))
rpc-jet.py 文件源码 项目:vmx-docker-lwaftr 作者: Juniper 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def connect(self):
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_IP)
        self.sock.setsockopt(socket.IPPROTO_IP, 109,1)
        self.sock.connect((self.host, self.port))
ssdp.py 文件源码 项目:panonoctl 作者: florianl 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def discover(self):
        ws      = None
        usn     = None
        apiV    = None
        srv     = None
        req =  ('M-SEARCH * HTTP/1.1\r\n' +
                'MX: 10\r\n' +
                'HOST: 239.255.255.250:1900\r\n' +
                'MAN: \"ssdp:discover\"\r\n' +
                'NT: panono:ball-camera\r\n' +
                '\r\n')
        ws = None
        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
        sock.settimeout(7)
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
        mcast = struct.pack('4sL', socket.inet_aton('239.255.255.250') , socket.INADDR_ANY)
        sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mcast)
        sock.bind(('', 1900))
        try:
            sock.sendto( req.encode(), ('239.255.255.250', 1900))
        except socket.error as e:
            print(e)
            return (None, None, None)
        for _ in range(5):
            try:
                data, addr = sock.recvfrom(1024)
                if not data: continue
                ws = ssdpNotify().getLocation(data)
                if ws is None: continue
                usn = ssdpNotify().getUsn(data)
                apiV = ssdpNotify().getApiVersion(data)
                srv = ssdpNotify().getSrv(data)
                break
            except socket.error as e:
                print(e)
                break
        sock.close()
        return (ws, usn, apiV, srv)
mihome.py 文件源码 项目:goodbye-mihome 作者: aluminiumgeek 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def receiver(service='mihome'):
    from plugins import gateway

    assert service in MULTICAST, 'No such service'
    store = get_store()
    address, port = MULTICAST.get(service)
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    sock.bind(("0.0.0.0", port))

    mreq = struct.pack("=4sl", socket.inet_aton(address), socket.INADDR_ANY)
    sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 32)
    sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_LOOP, 1)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, SOCKET_BUFSIZE)
    sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)

    current = {}

    while True:
        data, _ = sock.recvfrom(SOCKET_BUFSIZE)  # buffer size is 1024 bytes
        print(datetime.now().isoformat(), data)
        if service == 'mihome':
            message = json.loads(data.decode())
            data = json.loads(message['data'])
            if message.get('model') in ('sensor_ht', 'weather.v1') and not sensor_ht.process(conn, cursor, current, message, data):
                continue
            elif message.get('model') == 'magnet':
                magnet.process(store, message, data)
            elif message.get('model') == 'gateway':
                gateway.process(store, message, data)
            current = {}
        elif service == 'yeelight':
            yeelight.process(data.decode())


问题


面经


文章

微信
公众号

扫码关注公众号