def test_connect_without_payload(self):
giveup_event = win32event.CreateEvent(None, 0, 0, None)
t = threading.Thread(target=self.connect_thread_runner,
args=(False, giveup_event))
t.start()
time.sleep(0.1)
s2 = socket.socket()
ol = pywintypes.OVERLAPPED()
s2.bind(('0.0.0.0', 0)) # connectex requires the socket be bound beforehand
try:
win32file.ConnectEx(s2, self.addr, ol)
except win32file.error as exc:
win32event.SetEvent(giveup_event)
if exc.winerror == 10022: # WSAEINVAL
raise TestSkipped("ConnectEx is not available on this platform")
raise # some error error we don't expect.
win32file.GetOverlappedResult(s2.fileno(), ol, 1)
ol = pywintypes.OVERLAPPED()
buff = win32file.AllocateReadBuffer(1024)
win32file.WSARecv(s2, buff, ol, 0)
length = win32file.GetOverlappedResult(s2.fileno(), ol, 1)
self.response = buff[:length]
self.assertEqual(self.response, str2bytes('some expected response'))
t.join(5)
self.failIf(t.isAlive(), "worker thread didn't terminate")
评论列表
文章目录