python类MSG_PEEK的实例源码

supersocket.py 文件源码 项目:CyberScan 作者: medbenali 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def recv(self, x=MTU):
        pkt = self.ins.recv(x, socket.MSG_PEEK)
        x = len(pkt)
        if x == 0:
            raise socket.error((100,"Underlying stream socket tore down"))
        pkt = self.basecls(pkt)
        pad = pkt.getlayer(Padding)
        if pad is not None and pad.underlayer is not None:
            del(pad.underlayer.payload)
        while pad is not None and not isinstance(pad, NoPayload):
            x -= len(pad.load)
            pad = pad.payload
        self.ins.recv(x)
        return pkt
supersocket.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def recv(self, x=MTU):
        pkt = self.ins.recv(x, socket.MSG_PEEK)
        x = len(pkt)
        if x == 0:
            raise socket.error((100,"Underlying stream socket tore down"))
        pkt = self.basecls(pkt)
        pad = pkt.getlayer(conf.padding_layer)
        if pad is not None and pad.underlayer is not None:
            del(pad.underlayer.payload)
        while pad is not None and not isinstance(pad, NoPayload):
            x -= len(pad.load)
            pad = pad.payload
        self.ins.recv(x)
        return pkt
supersocket.py 文件源码 项目:CVE-2016-6366 作者: RiskSense-Ops 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def recv(self, x=MTU):
        pkt = self.ins.recv(x, socket.MSG_PEEK)
        x = len(pkt)
        if x == 0:
            raise socket.error((100,"Underlying stream socket tore down"))
        pkt = self.basecls(pkt)
        pad = pkt[Padding]
        if pad is not None and pad.underlayer is not None:
            del(pad.underlayer.payload)
        while pad is not None and not isinstance(pad, NoPayload):
            x -= len(pad.load)
            pad = pad.payload
        self.ins.recv(x)
        return pkt
web.py 文件源码 项目:VManagePlatform 作者: welliamcao 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def serve():
    bindsocket = socket.socket()
    bindsocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    #bindsocket.bind(('localhost', PORT))
    bindsocket.bind(('', PORT))
    bindsocket.listen(5)

    print("serving on port", PORT)

    while True:
        try:
            newsocket, from_addr = bindsocket.accept()
            peek = newsocket.recv(1024, socket.MSG_PEEK)
            if peek.startswith("\x16"):
                connstream = ssl.wrap_socket(
                        newsocket,
                        server_side=True,
                        certfile='self.pem',
                        ssl_version=ssl.PROTOCOL_TLSv1)
            else:
                connstream = newsocket

            do_request(connstream, from_addr)

        except Exception:
            traceback.print_exc()
SSL.py 文件源码 项目:aws-cfn-plex 作者: lordmuffin 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def recv_into(self, buffer, nbytes=None, flags=None):
        """
        Receive data on the connection and store the data into a buffer rather
        than creating a new string.

        :param buffer: The buffer to copy into.
        :param nbytes: (optional) The maximum number of bytes to read into the
            buffer. If not present, defaults to the size of the buffer. If
            larger than the size of the buffer, is reduced to the size of the
            buffer.
        :param flags: (optional) The only supported flag is ``MSG_PEEK``,
            all other flags are ignored.
        :return: The number of bytes read into the buffer.
        """
        if nbytes is None:
            nbytes = len(buffer)
        else:
            nbytes = min(nbytes, len(buffer))

        # We need to create a temporary buffer. This is annoying, it would be
        # better if we could pass memoryviews straight into the SSL_read call,
        # but right now we can't. Revisit this if CFFI gets that ability.
        buf = _ffi.new("char[]", nbytes)
        if flags is not None and flags & socket.MSG_PEEK:
            result = _lib.SSL_peek(self._ssl, buf, nbytes)
        else:
            result = _lib.SSL_read(self._ssl, buf, nbytes)
        self._raise_ssl_error(self._ssl, result)

        # This strange line is all to avoid a memory copy. The buffer protocol
        # should allow us to assign a CFFI buffer to the LHS of this line, but
        # on CPython 3.3+ that segfaults. As a workaround, we can temporarily
        # wrap it in a memoryview, except on Python 2.6 which doesn't have a
        # memoryview type.
        try:
            buffer[:result] = memoryview(_ffi.buffer(buf, result))
        except NameError:
            buffer[:result] = _ffi.buffer(buf, result)

        return result
SSL.py 文件源码 项目:Intranet-Penetration 作者: yuxiaokui 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def recv_into(self, buffer, nbytes=None, flags=None):
        """
        Receive data on the connection and store the data into a buffer rather
        than creating a new string.

        :param buffer: The buffer to copy into.
        :param nbytes: (optional) The maximum number of bytes to read into the
            buffer. If not present, defaults to the size of the buffer. If
            larger than the size of the buffer, is reduced to the size of the
            buffer.
        :param flags: (optional) The only supported flag is ``MSG_PEEK``,
            all other flags are ignored.
        :return: The number of bytes read into the buffer.
        """
        if nbytes is None:
            nbytes = len(buffer)
        else:
            nbytes = min(nbytes, len(buffer))

        # We need to create a temporary buffer. This is annoying, it would be
        # better if we could pass memoryviews straight into the SSL_read call,
        # but right now we can't. Revisit this if CFFI gets that ability.
        buf = _ffi.new("char[]", nbytes)
        if flags is not None and flags & socket.MSG_PEEK:
            result = _lib.SSL_peek(self._ssl, buf, nbytes)
        else:
            result = _lib.SSL_read(self._ssl, buf, nbytes)
        self._raise_ssl_error(self._ssl, result)

        # This strange line is all to avoid a memory copy. The buffer protocol
        # should allow us to assign a CFFI buffer to the LHS of this line, but
        # on CPython 3.3+ that segfaults. As a workaround, we can temporarily
        # wrap it in a memoryview, except on Python 2.6 which doesn't have a
        # memoryview type.
        try:
            buffer[:result] = memoryview(_ffi.buffer(buf, result))
        except NameError:
            buffer[:result] = _ffi.buffer(buf, result)

        return result
SSL.py 文件源码 项目:MKFQ 作者: maojingios 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def recv_into(self, buffer, nbytes=None, flags=None):
        """
        Receive data on the connection and store the data into a buffer rather
        than creating a new string.

        :param buffer: The buffer to copy into.
        :param nbytes: (optional) The maximum number of bytes to read into the
            buffer. If not present, defaults to the size of the buffer. If
            larger than the size of the buffer, is reduced to the size of the
            buffer.
        :param flags: (optional) The only supported flag is ``MSG_PEEK``,
            all other flags are ignored.
        :return: The number of bytes read into the buffer.
        """
        if nbytes is None:
            nbytes = len(buffer)
        else:
            nbytes = min(nbytes, len(buffer))

        # We need to create a temporary buffer. This is annoying, it would be
        # better if we could pass memoryviews straight into the SSL_read call,
        # but right now we can't. Revisit this if CFFI gets that ability.
        buf = _ffi.new("char[]", nbytes)
        if flags is not None and flags & socket.MSG_PEEK:
            result = _lib.SSL_peek(self._ssl, buf, nbytes)
        else:
            result = _lib.SSL_read(self._ssl, buf, nbytes)
        self._raise_ssl_error(self._ssl, result)

        # This strange line is all to avoid a memory copy. The buffer protocol
        # should allow us to assign a CFFI buffer to the LHS of this line, but
        # on CPython 3.3+ that segfaults. As a workaround, we can temporarily
        # wrap it in a memoryview, except on Python 2.6 which doesn't have a
        # memoryview type.
        try:
            buffer[:result] = memoryview(_ffi.buffer(buf, result))
        except NameError:
            buffer[:result] = _ffi.buffer(buf, result)

        return result
supersocket.py 文件源码 项目:hakkuframework 作者: 4shadoww 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def recv(self, x=MTU):
        pkt = self.ins.recv(x, socket.MSG_PEEK)
        x = len(pkt)
        if x == 0:
            raise socket.error((100,"Underlying stream socket tore down"))
        pkt = self.basecls(pkt)
        pad = pkt.getlayer(conf.padding_layer)
        if pad is not None and pad.underlayer is not None:
            del(pad.underlayer.payload)
        while pad is not None and not isinstance(pad, NoPayload):
            x -= len(pad.load)
            pad = pad.payload
        self.ins.recv(x)
        return pkt
cache.py 文件源码 项目:netconf-proxy 作者: fortinet-solutions-cse 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def socket_is_remote_closed (sock):
    rfds, unused, unused = select.select([sock], [], [], 0)
    try:
        if sock in rfds:
            buf = sock.recv(1, socket.MSG_PEEK)
            if len(buf) == 0:
                logger.debug("****** read 0 on peek assuming closed")
                return True
        return False
    except Exception as error:
        logger.debug("***** GOT EXCEPTION on read(PEEK) must be closed: %s", str(error))
        return True
test_ssl.py 文件源码 项目:2FAssassin 作者: maxwellkoh 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_peek(self):
        """
        `Connection.recv` peeks into the connection if `socket.MSG_PEEK` is
        passed.
        """
        server, client = loopback()
        server.send(b'xy')
        assert client.recv(2, MSG_PEEK) == b'xy'
        assert client.recv(2, MSG_PEEK) == b'xy'
        assert client.recv(2) == b'xy'
test_ssl.py 文件源码 项目:2FAssassin 作者: maxwellkoh 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def test_peek(self):
        server, client = loopback()
        server.send(b'xy')

        for _ in range(2):
            output_buffer = bytearray(5)
            assert client.recv_into(output_buffer, flags=MSG_PEEK) == 2
            assert output_buffer == bytearray(b'xy\x00\x00\x00')
SSL.py 文件源码 项目:2FAssassin 作者: maxwellkoh 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def recv_into(self, buffer, nbytes=None, flags=None):
        """
        Receive data on the connection and store the data into a buffer rather
        than creating a new string.

        :param buffer: The buffer to copy into.
        :param nbytes: (optional) The maximum number of bytes to read into the
            buffer. If not present, defaults to the size of the buffer. If
            larger than the size of the buffer, is reduced to the size of the
            buffer.
        :param flags: (optional) The only supported flag is ``MSG_PEEK``,
            all other flags are ignored.
        :return: The number of bytes read into the buffer.
        """
        if nbytes is None:
            nbytes = len(buffer)
        else:
            nbytes = min(nbytes, len(buffer))

        # We need to create a temporary buffer. This is annoying, it would be
        # better if we could pass memoryviews straight into the SSL_read call,
        # but right now we can't. Revisit this if CFFI gets that ability.
        buf = _no_zero_allocator("char[]", nbytes)
        if flags is not None and flags & socket.MSG_PEEK:
            result = _lib.SSL_peek(self._ssl, buf, nbytes)
        else:
            result = _lib.SSL_read(self._ssl, buf, nbytes)
        self._raise_ssl_error(self._ssl, result)

        # This strange line is all to avoid a memory copy. The buffer protocol
        # should allow us to assign a CFFI buffer to the LHS of this line, but
        # on CPython 3.3+ that segfaults. As a workaround, we can temporarily
        # wrap it in a memoryview, except on Python 2.6 which doesn't have a
        # memoryview type.
        try:
            buffer[:result] = memoryview(_ffi.buffer(buf, result))
        except NameError:
            buffer[:result] = _ffi.buffer(buf, result)

        return result
iostream.py 文件源码 项目:microProxy 作者: mike820324 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _should_socket_close(self):
        _data = self.socket.recv(1, socket.MSG_PEEK)
        return len(_data) == 0
iostream.py 文件源码 项目:microProxy 作者: mike820324 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def peek(self, length):
        """Peek into the underline socket buffer.

        Args:
            length (int): The peeking buffer length.

        Returns:
            (bytes): Bytes of buffer.

        Raises:
            ValueError: If length is not int and smaller than one
            will raise ValueError.
        """
        if not isinstance(length, int) or length < 0:
            raise ValueError("Incorrect length.")

        # TODO: Change into nonblocking mode.
        self.socket.setblocking(True)
        while True:
            try:
                _data = self.socket.recv(length, socket.MSG_PEEK)
            except socket.error:
                continue
            except:
                raise
            else:
                return _data
            finally:
                self.socket.setblocking(False)
test_websocketproxy.py 文件源码 项目:realtimedisplay 作者: SuperDARNCanada 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def recv(self, amt, flags=None):
        res = self._data[0:amt]
        if not (flags & socket.MSG_PEEK):
            self._data = self._data[amt:]

        return res
test_websocket.py 文件源码 项目:realtimedisplay 作者: SuperDARNCanada 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def recv(self, amt, flags=None):
        res = self._data[0:amt]
        if not (flags & socket.MSG_PEEK):
            self._data = self._data[amt:]

        return res
supersocket.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def recv(self, x=MTU):
        pkt = self.ins.recv(x, socket.MSG_PEEK)
        x = len(pkt)
        if x == 0:
            raise socket.error((100,"Underlying stream socket tore down"))
        pkt = self.basecls(pkt)
        pad = pkt.getlayer(conf.padding_layer)
        if pad is not None and pad.underlayer is not None:
            del(pad.underlayer.payload)
        while pad is not None and not isinstance(pad, NoPayload):
            x -= len(pad.load)
            pad = pad.payload
        self.ins.recv(x)
        return pkt
supersocket.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def recv(self, x=MTU):
        pkt = self.ins.recv(x, socket.MSG_PEEK)
        x = len(pkt)
        if x == 0:
            raise socket.error((100,"Underlying stream socket tore down"))
        pkt = self.basecls(pkt)
        pad = pkt.getlayer(conf.padding_layer)
        if pad is not None and pad.underlayer is not None:
            del(pad.underlayer.payload)
        while pad is not None and not isinstance(pad, NoPayload):
            x -= len(pad.load)
            pad = pad.payload
        self.ins.recv(x)
        return pkt
supersocket.py 文件源码 项目:scapy-bpf 作者: guedou 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def recv(self, x=MTU):
        pkt = self.ins.recv(x, socket.MSG_PEEK)
        x = len(pkt)
        if x == 0:
            raise socket.error((100,"Underlying stream socket tore down"))
        pkt = self.basecls(pkt)
        pad = pkt.getlayer(conf.padding_layer)
        if pad is not None and pad.underlayer is not None:
            del(pad.underlayer.payload)
        while pad is not None and not isinstance(pad, NoPayload):
            x -= len(pad.load)
            pad = pad.payload
        self.ins.recv(x)
        return pkt
supersocket.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def recv(self, x=MTU):
        pkt = self.ins.recv(x, socket.MSG_PEEK)
        x = len(pkt)
        if x == 0:
            raise socket.error((100,"Underlying stream socket tore down"))
        pkt = self.basecls(pkt)
        pad = pkt.getlayer(conf.padding_layer)
        if pad is not None and pad.underlayer is not None:
            del(pad.underlayer.payload)
        while pad is not None and not isinstance(pad, NoPayload):
            x -= len(pad.load)
            pad = pad.payload
        self.ins.recv(x)
        return pkt


问题


面经


文章

微信
公众号

扫码关注公众号