def testAcceptEx(self):
port = 4680
running = threading.Event()
stopped = threading.Event()
t = threading.Thread(target=self.acceptWorker, args=(port, running,stopped))
t.start()
running.wait(2)
if not running.isSet():
self.fail("AcceptEx Worker thread failed to start")
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(('127.0.0.1', port))
win32file.WSASend(s, str2bytes("hello"), None)
overlapped = pywintypes.OVERLAPPED()
overlapped.hEvent = win32event.CreateEvent(None, 0, 0, None)
# Like above - WSARecv used to allow strings as the receive buffer!!
buffer = " " * 10
self.assertRaises(TypeError, win32file.WSARecv, s, buffer, overlapped)
# This one should work :)
buffer = win32file.AllocateReadBuffer(10)
win32file.WSARecv(s, buffer, overlapped)
nbytes = win32file.GetOverlappedResult(s.fileno(), overlapped, True)
got = buffer[:nbytes]
self.failUnlessEqual(got, str2bytes("hello"))
# thread should have stopped
stopped.wait(2)
if not stopped.isSet():
self.fail("AcceptEx Worker thread failed to successfully stop")
评论列表
文章目录