def connect_thread_runner(self, expect_payload, giveup_event):
# As Windows 2000 doesn't do ConnectEx, we need to use a non-blocking
# accept, as our test connection may never come. May as well use
# AcceptEx for this...
listener = socket.socket()
self.addr = ('localhost', random.randint(10000,64000))
listener.bind(self.addr)
listener.listen(1)
# create accept socket
accepter = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# An overlapped
overlapped = pywintypes.OVERLAPPED()
overlapped.hEvent = win32event.CreateEvent(None, 0, 0, None)
# accept the connection.
if expect_payload:
buf_size = 1024
else:
# when we don't expect data we must be careful to only pass the
# exact number of bytes for the endpoint data...
buf_size = win32file.CalculateSocketEndPointSize(listener)
buffer = win32file.AllocateReadBuffer(buf_size)
win32file.AcceptEx(listener, accepter, buffer, overlapped)
# wait for the connection or our test to fail.
events = giveup_event, overlapped.hEvent
rc = win32event.WaitForMultipleObjects(events, False, 2000)
if rc == win32event.WAIT_TIMEOUT:
self.fail("timed out waiting for a connection")
if rc == win32event.WAIT_OBJECT_0:
# Our main thread running the test failed and will never connect.
return
# must be a connection.
nbytes = win32file.GetOverlappedResult(listener.fileno(), overlapped, False)
if expect_payload:
self.request = buffer[:nbytes]
accepter.send(str2bytes('some expected response'))
评论列表
文章目录