python类MSG_OOB的实例源码

test_asyncore.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_handle_expt(self):
        # Make sure handle_expt is called on OOB data received.
        # Note: this might fail on some platforms as OOB data is
        # tenuously supported and rarely used.

        class TestClient(BaseClient):
            def handle_expt(self):
                self.flag = True

        class TestHandler(BaseTestHandler):
            def __init__(self, conn):
                BaseTestHandler.__init__(self, conn)
                self.socket.send(bytes(chr(244), 'latin-1'), socket.MSG_OOB)

        server = TCPServer(TestHandler)
        client = TestClient(server.address)
        self.loop_waiting_for_flag(client)
test_asyncore.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_handle_expt(self):
        # Make sure handle_expt is called on OOB data received.
        # Note: this might fail on some platforms as OOB data is
        # tenuously supported and rarely used.

        class TestClient(BaseClient):
            def handle_expt(self):
                self.flag = True

        class TestHandler(BaseTestHandler):
            def __init__(self, conn):
                BaseTestHandler.__init__(self, conn)
                self.socket.send(chr(244), socket.MSG_OOB)

        server = TCPServer(TestHandler)
        client = TestClient(server.address)
        self.loop_waiting_for_flag(client)
test_asyncore.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def test_handle_expt(self):
        # Make sure handle_expt is called on OOB data received.
        # Note: this might fail on some platforms as OOB data is
        # tenuously supported and rarely used.

        class TestClient(BaseClient):
            def handle_expt(self):
                self.flag = True

        class TestHandler(BaseTestHandler):
            def __init__(self, conn):
                BaseTestHandler.__init__(self, conn)
                self.socket.send(chr(244), socket.MSG_OOB)

        server = TCPServer(TestHandler)
        client = TestClient(server.address)
        self.loop_waiting_for_flag(client)
test_asyncore.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_handle_expt(self):
        # Make sure handle_expt is called on OOB data received.
        # Note: this might fail on some platforms as OOB data is
        # tenuously supported and rarely used.
        if HAS_UNIX_SOCKETS and self.family == socket.AF_UNIX:
            self.skipTest("Not applicable to AF_UNIX sockets.")

        class TestClient(BaseClient):
            def handle_expt(self):
                self.socket.recv(1024, socket.MSG_OOB)
                self.flag = True

        class TestHandler(BaseTestHandler):
            def __init__(self, conn):
                BaseTestHandler.__init__(self, conn)
                self.socket.send(bytes(chr(244), 'latin-1'), socket.MSG_OOB)

        server = BaseServer(self.family, self.addr, TestHandler)
        client = TestClient(self.family, server.address)
        self.loop_waiting_for_flag(client)
test_asyncore.py 文件源码 项目:pefile.pypy 作者: cloudtracer 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_handle_expt(self):
        # Make sure handle_expt is called on OOB data received.
        # Note: this might fail on some platforms as OOB data is
        # tenuously supported and rarely used.

        class TestClient(BaseClient):
            def handle_expt(self):
                self.flag = True

        class TestHandler(BaseTestHandler):
            def __init__(self, conn):
                BaseTestHandler.__init__(self, conn)
                self.socket.send(chr(244), socket.MSG_OOB)

        server = TCPServer(TestHandler)
        client = TestClient(server.address)
        self.loop_waiting_for_flag(client)
test_asyncore.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_handle_expt(self):
        # Make sure handle_expt is called on OOB data received.
        # Note: this might fail on some platforms as OOB data is
        # tenuously supported and rarely used.
        if HAS_UNIX_SOCKETS and self.family == socket.AF_UNIX:
            self.skipTest("Not applicable to AF_UNIX sockets.")

        class TestClient(BaseClient):
            def handle_expt(self):
                self.socket.recv(1024, socket.MSG_OOB)
                self.flag = True

        class TestHandler(BaseTestHandler):
            def __init__(self, conn):
                BaseTestHandler.__init__(self, conn)
                self.socket.send(bytes(chr(244), 'latin-1'), socket.MSG_OOB)

        server = BaseServer(self.family, self.addr, TestHandler)
        client = TestClient(self.family, server.address)
        self.loop_waiting_for_flag(client)
test_asyncore.py 文件源码 项目:ndk-python 作者: gittor 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_handle_expt(self):
        # Make sure handle_expt is called on OOB data received.
        # Note: this might fail on some platforms as OOB data is
        # tenuously supported and rarely used.

        class TestClient(BaseClient):
            def handle_expt(self):
                self.flag = True

        class TestHandler(BaseTestHandler):
            def __init__(self, conn):
                BaseTestHandler.__init__(self, conn)
                self.socket.send(chr(244), socket.MSG_OOB)

        server = TCPServer(TestHandler)
        client = TestClient(server.address)
        self.loop_waiting_for_flag(client)
test_asyncore.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_handle_expt(self):
        # Make sure handle_expt is called on OOB data received.
        # Note: this might fail on some platforms as OOB data is
        # tenuously supported and rarely used.
        if HAS_UNIX_SOCKETS and self.family == socket.AF_UNIX:
            self.skipTest("Not applicable to AF_UNIX sockets.")

        class TestClient(BaseClient):
            def handle_expt(self):
                self.socket.recv(1024, socket.MSG_OOB)
                self.flag = True

        class TestHandler(BaseTestHandler):
            def __init__(self, conn):
                BaseTestHandler.__init__(self, conn)
                self.socket.send(bytes(chr(244), 'latin-1'), socket.MSG_OOB)

        server = BaseServer(self.family, self.addr, TestHandler)
        client = TestClient(self.family, server.address)
        self.loop_waiting_for_flag(client)
pyping.py 文件源码 项目:do-latency 作者: dizballanze 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def send_ping(raw_socket, dest_addr, pkt_id, seq_code, data_length=48):
    """Echo request.
    """

    pkt_crc16 = 0
    # icmp_type(1B):icmp_code(1B):crc16(2B):id(2):seq(2b)
    header = struct.pack('>bbHHH', 8, 0, pkt_crc16, pkt_id, seq_code)

    data = b'p' * data_length

    pkt_crc16 = calc_crc16(header + data)
    header = struct.pack('>bbHHH', 8, 0, pkt_crc16, pkt_id, seq_code)

    packet = header + data
    raw_socket.sendto(packet, (dest_addr, socket.MSG_OOB))
test_functional.py 文件源码 项目:sdk-samples 作者: cradlepoint 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_oob_abor(self):
        # Send ABOR by following the RFC-959 directives of sending
        # Telnet IP/Synch sequence as OOB data.
        # On some systems like FreeBSD this happened to be a problem
        # due to a different SO_OOBINLINE behavior.
        # On some platforms (e.g. Python CE) the test may fail
        # although the MSG_OOB constant is defined.
        self.client.sock.sendall(b(chr(244)), socket.MSG_OOB)
        self.client.sock.sendall(b(chr(255)), socket.MSG_OOB)
        self.client.sock.sendall(b'abor\r\n')
        self.assertEqual(self.client.getresp()[:3], '225')


问题


面经


文章

微信
公众号

扫码关注公众号