def send(self, data, wait_for_response):
# header consists of magic "PBAU" sequence
# + protocol version (byte, currently 1)
# + domain id (integer)
# + message size (short)
# + connection id (int, user definable, defaults to 0)
# + protocol flag (byte, 0 for TCP)
# + checksum (byte)
header = struct.pack("!BlhlB", 1, self.domain, len(data), 0, 0)
checksum = struct.pack("!B", sum(bytearray(header)) % 255)
self.__sock.sendall(b'PBAU' + header + checksum + bytes(data))
if wait_for_response:
# receive only header (17 bytes)
# TODO # WARNING: This does not take into account
# TODO # cases where socket.recv returns less than 16
# TODO # in practice this will never happen though.
header = self.__sock.recv(17)
# check for magic bytes...
if header[0:4] == b'PBAU':
# ...then parse the rest
header_parsed = struct.unpack("!4sBlhlBB", header)
response_length = header_parsed[3]
return ByteUtil(self.__sock.recv(response_length))
# if not wait_for_response OR invalid response:
return ByteUtil()
python类recv()的实例源码
def testRecv(self):
# Testing large receive over TCP
msg = self.cli_conn.recv(1024)
self.assertEqual(msg, MSG)
def testOverFlowRecv(self):
# Testing receive in chunks over TCP
seg1 = self.cli_conn.recv(len(MSG) - 3)
seg2 = self.cli_conn.recv(1024)
msg = seg1 + seg2
self.assertEqual(msg, MSG)
def testSendAll(self):
# Testing sendall() with a 2048 byte string over TCP
msg = b''
while 1:
read = self.cli_conn.recv(1024)
if not read:
break
msg += read
self.assertEqual(msg, b'f' * 2048)
def testFromFd(self):
# Testing fromfd()
fd = self.cli_conn.fileno()
sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
self.addCleanup(sock.close)
self.assertIsInstance(sock, socket.socket)
msg = sock.recv(1024)
self.assertEqual(msg, MSG)
def testDup(self):
# Testing dup()
sock = self.cli_conn.dup()
self.addCleanup(sock.close)
msg = sock.recv(1024)
self.assertEqual(msg, MSG)
def testDetach(self):
# Testing detach()
fileno = self.cli_conn.fileno()
f = self.cli_conn.detach()
self.assertEqual(f, fileno)
# cli_conn cannot be used anymore...
self.assertRaises(socket.error, self.cli_conn.recv, 1024)
self.cli_conn.close()
# ...but we can create another socket using the (still open)
# file descriptor
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=f)
self.addCleanup(sock.close)
msg = sock.recv(1024)
self.assertEqual(msg, MSG)
def testSendtoAndRecv(self):
# Testing sendto() and Recv() over UDP
msg = self.serv.recv(len(MSG))
self.assertEqual(msg, MSG)
def testClose(self):
conn, addr = self.serv.accept()
conn.close()
sd = self.cli
read, write, err = select.select([sd], [], [], 1.0)
self.assertEqual(read, [sd])
self.assertEqual(sd.recv(1), b'')
# Calling close() many times should be safe.
conn.close()
conn.close()
def testRecv(self):
msg = self.serv.recv(1024)
self.assertEqual(msg, MSG)
def _testSend(self):
msg = self.cli.recv(1024)
self.assertEqual(msg, MSG)
def testMakefileAfterMakefileClose(self):
self.read_file.close()
msg = self.cli_conn.recv(len(MSG))
if isinstance(self.read_msg, str):
msg = msg.decode()
self.assertEqual(msg, self.read_msg)
def __init__(self, recv_funcs=()):
# A generator that returns callables that we'll call for each
# call to recv().
self._recv_step = iter(recv_funcs)
def testMakefileClose(self):
# The file returned by makefile should keep the socket open...
self.cli_conn.close()
msg = self.cli_conn.recv(1024)
self.assertEqual(msg, self.read_msg)
# ...until the file is itself closed
self.read_file.close()
self.assertRaises(socket.error, self.cli_conn.recv, 1024)
def _testSmallReadNonBlocking(self):
self.evt1.wait(1.0)
self.write_file.write(self.write_msg)
self.write_file.flush()
self.evt2.set()
# Avoid cloding the socket before the server test has finished,
# otherwise system recv() will return 0 instead of EWOULDBLOCK.
self.serv_finished.wait(5.0)
def _testOutsideTimeout(self):
self.cli = sock = socket.create_connection((HOST, self.port), timeout=1)
self.assertRaises(socket.timeout, lambda: sock.recv(5))
def testUDPTimeout(self):
def raise_timeout(*args, **kwargs):
self.serv.settimeout(1.0)
self.serv.recv(1024)
self.assertRaises(socket.timeout, raise_timeout,
"Error generating a timeout exception (UDP)")
def testTimeoutZero(self):
ok = False
try:
self.serv.settimeout(0.0)
foo = self.serv.recv(1024)
except socket.timeout:
self.fail("caught timeout instead of error (UDP)")
except socket.error:
ok = True
except:
self.fail("caught unexpected exception (UDP)")
if not ok:
self.fail("recv() returned success when we did not expect it")
def testStream(self):
msg = self.conn.recv(1024)
self.assertEqual(msg, MSG)
self.assertEqual(self.cliaddr, self.connaddr)
def testCreateConnectionBase(self):
conn, addr = self.serv.accept()
self.addCleanup(conn.close)
data = conn.recv(1024)
conn.sendall(data)