def test_handle_expt(self):
# Make sure handle_expt is called on OOB data received.
# Note: this might fail on some platforms as OOB data is
# tenuously supported and rarely used.
class TestClient(BaseClient):
def handle_expt(self):
self.flag = True
class TestHandler(BaseTestHandler):
def __init__(self, conn):
BaseTestHandler.__init__(self, conn)
self.socket.send(bytes(chr(244), 'latin-1'), socket.MSG_OOB)
server = TCPServer(TestHandler)
client = TestClient(server.address)
self.loop_waiting_for_flag(client)
python类MSG_OOB的实例源码
def test_handle_expt(self):
# Make sure handle_expt is called on OOB data received.
# Note: this might fail on some platforms as OOB data is
# tenuously supported and rarely used.
class TestClient(BaseClient):
def handle_expt(self):
self.flag = True
class TestHandler(BaseTestHandler):
def __init__(self, conn):
BaseTestHandler.__init__(self, conn)
self.socket.send(chr(244), socket.MSG_OOB)
server = TCPServer(TestHandler)
client = TestClient(server.address)
self.loop_waiting_for_flag(client)
def test_handle_expt(self):
# Make sure handle_expt is called on OOB data received.
# Note: this might fail on some platforms as OOB data is
# tenuously supported and rarely used.
class TestClient(BaseClient):
def handle_expt(self):
self.flag = True
class TestHandler(BaseTestHandler):
def __init__(self, conn):
BaseTestHandler.__init__(self, conn)
self.socket.send(chr(244), socket.MSG_OOB)
server = TCPServer(TestHandler)
client = TestClient(server.address)
self.loop_waiting_for_flag(client)
def test_handle_expt(self):
# Make sure handle_expt is called on OOB data received.
# Note: this might fail on some platforms as OOB data is
# tenuously supported and rarely used.
if HAS_UNIX_SOCKETS and self.family == socket.AF_UNIX:
self.skipTest("Not applicable to AF_UNIX sockets.")
class TestClient(BaseClient):
def handle_expt(self):
self.socket.recv(1024, socket.MSG_OOB)
self.flag = True
class TestHandler(BaseTestHandler):
def __init__(self, conn):
BaseTestHandler.__init__(self, conn)
self.socket.send(bytes(chr(244), 'latin-1'), socket.MSG_OOB)
server = BaseServer(self.family, self.addr, TestHandler)
client = TestClient(self.family, server.address)
self.loop_waiting_for_flag(client)
def test_handle_expt(self):
# Make sure handle_expt is called on OOB data received.
# Note: this might fail on some platforms as OOB data is
# tenuously supported and rarely used.
class TestClient(BaseClient):
def handle_expt(self):
self.flag = True
class TestHandler(BaseTestHandler):
def __init__(self, conn):
BaseTestHandler.__init__(self, conn)
self.socket.send(chr(244), socket.MSG_OOB)
server = TCPServer(TestHandler)
client = TestClient(server.address)
self.loop_waiting_for_flag(client)
def test_handle_expt(self):
# Make sure handle_expt is called on OOB data received.
# Note: this might fail on some platforms as OOB data is
# tenuously supported and rarely used.
if HAS_UNIX_SOCKETS and self.family == socket.AF_UNIX:
self.skipTest("Not applicable to AF_UNIX sockets.")
class TestClient(BaseClient):
def handle_expt(self):
self.socket.recv(1024, socket.MSG_OOB)
self.flag = True
class TestHandler(BaseTestHandler):
def __init__(self, conn):
BaseTestHandler.__init__(self, conn)
self.socket.send(bytes(chr(244), 'latin-1'), socket.MSG_OOB)
server = BaseServer(self.family, self.addr, TestHandler)
client = TestClient(self.family, server.address)
self.loop_waiting_for_flag(client)
def test_handle_expt(self):
# Make sure handle_expt is called on OOB data received.
# Note: this might fail on some platforms as OOB data is
# tenuously supported and rarely used.
class TestClient(BaseClient):
def handle_expt(self):
self.flag = True
class TestHandler(BaseTestHandler):
def __init__(self, conn):
BaseTestHandler.__init__(self, conn)
self.socket.send(chr(244), socket.MSG_OOB)
server = TCPServer(TestHandler)
client = TestClient(server.address)
self.loop_waiting_for_flag(client)
def test_handle_expt(self):
# Make sure handle_expt is called on OOB data received.
# Note: this might fail on some platforms as OOB data is
# tenuously supported and rarely used.
if HAS_UNIX_SOCKETS and self.family == socket.AF_UNIX:
self.skipTest("Not applicable to AF_UNIX sockets.")
class TestClient(BaseClient):
def handle_expt(self):
self.socket.recv(1024, socket.MSG_OOB)
self.flag = True
class TestHandler(BaseTestHandler):
def __init__(self, conn):
BaseTestHandler.__init__(self, conn)
self.socket.send(bytes(chr(244), 'latin-1'), socket.MSG_OOB)
server = BaseServer(self.family, self.addr, TestHandler)
client = TestClient(self.family, server.address)
self.loop_waiting_for_flag(client)
def send_ping(raw_socket, dest_addr, pkt_id, seq_code, data_length=48):
"""Echo request.
"""
pkt_crc16 = 0
# icmp_type(1B):icmp_code(1B):crc16(2B):id(2):seq(2b)
header = struct.pack('>bbHHH', 8, 0, pkt_crc16, pkt_id, seq_code)
data = b'p' * data_length
pkt_crc16 = calc_crc16(header + data)
header = struct.pack('>bbHHH', 8, 0, pkt_crc16, pkt_id, seq_code)
packet = header + data
raw_socket.sendto(packet, (dest_addr, socket.MSG_OOB))
def test_oob_abor(self):
# Send ABOR by following the RFC-959 directives of sending
# Telnet IP/Synch sequence as OOB data.
# On some systems like FreeBSD this happened to be a problem
# due to a different SO_OOBINLINE behavior.
# On some platforms (e.g. Python CE) the test may fail
# although the MSG_OOB constant is defined.
self.client.sock.sendall(b(chr(244)), socket.MSG_OOB)
self.client.sock.sendall(b(chr(255)), socket.MSG_OOB)
self.client.sock.sendall(b'abor\r\n')
self.assertEqual(self.client.getresp()[:3], '225')