def test_client_disconnect(self):
"""Issue #95 Server must handle disconnect from client in the middle of response
"""
def long_response(environ, start_response):
start_response('200 OK', [('Content-Length', '9876')])
yield b'a' * 9876
server_sock = eventlet.listen(('localhost', 0))
self.server_addr = server_sock.getsockname()
server = wsgi.Server(server_sock, server_sock.getsockname(), long_response,
log=self.logfile)
def make_request():
sock = eventlet.connect(server_sock.getsockname())
sock.send(b'GET / HTTP/1.1\r\nHost: localhost\r\n\r\n')
sock.close()
request_thread = eventlet.spawn(make_request)
client_sock, addr = server_sock.accept()
# Next line must not raise IOError -32 Broken pipe
server.process_request([addr, client_sock, wsgi.STATE_IDLE])
request_thread.wait()
server_sock.close()
python类connect()的实例源码
def test_disable_header_name_capitalization(self):
# Disable HTTP header name capitalization
#
# https://github.com/eventlet/eventlet/issues/80
random_case_header = ('eTAg', 'TAg-VAluE')
def wsgi_app(environ, start_response):
start_response('200 oK', [random_case_header])
return [b'']
self.spawn_server(site=wsgi_app, capitalize_response_headers=False)
sock = eventlet.connect(self.server_addr)
sock.sendall(b'GET / HTTP/1.1\r\nHost: localhost\r\n\r\n')
result = read_http(sock)
sock.close()
self.assertEqual(result.status, 'HTTP/1.1 200 oK')
self.assertEqual(result.headers_lower[random_case_header[0].lower()], random_case_header[1])
self.assertEqual(result.headers_original[random_case_header[0]], random_case_header[1])
def test_chunked_readline_wsgi_override_minimum_chunk_size(self):
fd = self.connect()
fd.sendall(b"POST /yield_spaces/override_min HTTP/1.1\r\nContent-Length: 0\r\n\r\n")
resp_so_far = b''
with eventlet.Timeout(.1):
while True:
one_byte = fd.recv(1)
resp_so_far += one_byte
if resp_so_far.endswith(b'\r\n\r\n'):
break
self.assertEqual(fd.recv(1), b' ')
try:
with eventlet.Timeout(.1):
fd.recv(1)
except eventlet.Timeout:
pass
else:
assert False
self.yield_next_space = True
with eventlet.Timeout(.1):
self.assertEqual(fd.recv(1), b' ')
def test_chunked_readline_wsgi_not_override_minimum_chunk_size(self):
fd = self.connect()
fd.sendall(b"POST /yield_spaces HTTP/1.1\r\nContent-Length: 0\r\n\r\n")
resp_so_far = b''
try:
with eventlet.Timeout(.1):
while True:
one_byte = fd.recv(1)
resp_so_far += one_byte
if resp_so_far.endswith(b'\r\n\r\n'):
break
self.assertEqual(fd.recv(1), b' ')
except eventlet.Timeout:
pass
else:
assert False
def test_connect_tcp(self):
def accept_once(listenfd):
try:
conn, addr = listenfd.accept()
fd = conn.makefile(mode='wb')
conn.close()
fd.write(b'hello\n')
fd.close()
finally:
listenfd.close()
server = eventlet.listen(('0.0.0.0', 0))
eventlet.spawn_n(accept_once, server)
client = eventlet.connect(('127.0.0.1', server.getsockname()[1]))
fd = client.makefile('rb')
client.close()
assert fd.readline() == b'hello\n'
assert fd.read() == b''
fd.close()
check_hub()
def test_001_trampoline_timeout(self):
server_sock = eventlet.listen(('127.0.0.1', 0))
bound_port = server_sock.getsockname()[1]
def server(sock):
client, addr = sock.accept()
eventlet.sleep(0.1)
server_evt = eventlet.spawn(server, server_sock)
eventlet.sleep(0)
try:
desc = eventlet.connect(('127.0.0.1', bound_port))
hubs.trampoline(desc, read=True, write=False, timeout=0.001)
except eventlet.Timeout:
pass # test passed
else:
assert False, "Didn't timeout"
server_evt.wait()
check_hub()
def test_concurrency(self):
evt = eventlet.Event()
def waiter(sock, addr):
sock.sendall(b'hi')
evt.wait()
l = eventlet.listen(('localhost', 0))
eventlet.spawn(eventlet.serve, l, waiter, 5)
def test_client():
c = eventlet.connect(('localhost', l.getsockname()[1]))
# verify the client is connected by getting data
self.assertEqual(b'hi', c.recv(2))
return c
[test_client() for i in range(5)]
# very next client should not get anything
x = eventlet.with_timeout(
0.01,
test_client,
timeout_value="timed out")
self.assertEqual(x, "timed out")
def test_correct_upgrade_request_13(self):
for http_connection in ['Upgrade', 'UpGrAdE', 'keep-alive, Upgrade']:
connect = [
"GET /echo HTTP/1.1",
"Upgrade: websocket",
"Connection: %s" % http_connection,
"Host: %s:%s" % self.server_addr,
"Origin: http://%s:%s" % self.server_addr,
"Sec-WebSocket-Version: 13",
"Sec-WebSocket-Key: d9MXuOzlVQ0h+qRllvSCIg==",
]
sock = eventlet.connect(self.server_addr)
sock.sendall(six.b('\r\n'.join(connect) + '\r\n\r\n'))
result = sock.recv(1024)
# The server responds the correct Websocket handshake
print('Connection string: %r' % http_connection)
self.assertEqual(result, six.b('\r\n'.join([
'HTTP/1.1 101 Switching Protocols',
'Upgrade: websocket',
'Connection: Upgrade',
'Sec-WebSocket-Accept: ywSyWXCPNsDxLrQdQrn5RFNRfBU=\r\n\r\n',
])))
def test_send_recv_13(self):
connect = [
"GET /echo HTTP/1.1",
"Upgrade: websocket",
"Connection: Upgrade",
"Host: %s:%s" % self.server_addr,
"Origin: http://%s:%s" % self.server_addr,
"Sec-WebSocket-Version: 13",
"Sec-WebSocket-Key: d9MXuOzlVQ0h+qRllvSCIg==",
]
sock = eventlet.connect(self.server_addr)
sock.sendall(six.b('\r\n'.join(connect) + '\r\n\r\n'))
sock.recv(1024)
ws = websocket.RFC6455WebSocket(sock, {}, client=True)
ws.send(b'hello')
assert ws.wait() == b'hello'
ws.send(b'hello world!\x01')
ws.send(u'hello world again!')
assert ws.wait() == b'hello world!\x01'
assert ws.wait() == u'hello world again!'
ws.close()
eventlet.sleep(0.01)
def connect_to(self,endpoint):
""" Use this to connect to the server if an endpoint is not passed in __init__
"""
self.tcp_sock = eventlet.connect(endpoint)
self.cipher = crypto.Cipher()
self.recv_buff = buffer.Buffer()
self.compression_enabled = False
self.protocol_mode = 0 # did you really think it made sense to set this to anything else you maniac?
self.send_q = eventlet.queue.LightQueue(0)
self.ready = True
self.pool.spawn_n(self.read_thread)
self.pool.spawn_n(self.send_thread)
def test_connect_timeout(self):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(0.1)
gs = greenio.GreenSocket(s)
try:
expect_socket_timeout(gs.connect, ('192.0.2.1', 80))
except socket.error as e:
# unreachable is also a valid outcome
if not get_errno(e) in (errno.EHOSTUNREACH, errno.ENETUNREACH):
raise
def test_recv_timeout(self):
listener = greenio.GreenSocket(socket.socket())
listener.bind(('', 0))
listener.listen(50)
evt = event.Event()
def server():
# accept the connection in another greenlet
sock, addr = listener.accept()
evt.wait()
gt = eventlet.spawn(server)
addr = listener.getsockname()
client = greenio.GreenSocket(socket.socket())
client.settimeout(0.1)
client.connect(addr)
expect_socket_timeout(client.recv, 0)
expect_socket_timeout(client.recv, 8192)
evt.send()
gt.wait()
def test_send_timeout(self):
self.reset_timeout(2)
listener = bufsized(eventlet.listen(('', 0)))
evt = event.Event()
def server():
# accept the connection in another greenlet
sock, addr = listener.accept()
sock = bufsized(sock)
evt.wait()
gt = eventlet.spawn(server)
addr = listener.getsockname()
client = bufsized(greenio.GreenSocket(socket.socket()))
client.connect(addr)
client.settimeout(0.00001)
msg = b"A" * 100000 # large enough number to overwhelm most buffers
# want to exceed the size of the OS buffer so it'll block in a
# single send
def send():
for x in range(10):
client.send(msg)
expect_socket_timeout(send)
evt.send()
gt.wait()
def test_sendall_timeout(self):
listener = greenio.GreenSocket(socket.socket())
listener.bind(('', 0))
listener.listen(50)
evt = event.Event()
def server():
# accept the connection in another greenlet
sock, addr = listener.accept()
evt.wait()
gt = eventlet.spawn(server)
addr = listener.getsockname()
client = greenio.GreenSocket(socket.socket())
client.settimeout(0.1)
client.connect(addr)
# want to exceed the size of the OS buffer so it'll block
msg = b"A" * (8 << 20)
expect_socket_timeout(client.sendall, msg)
evt.send()
gt.wait()
def test_del_closes_socket(self):
def accept_once(listener):
# delete/overwrite the original conn
# object, only keeping the file object around
# closing the file object should close everything
try:
conn, addr = listener.accept()
conn = conn.makefile('wb')
conn.write(b'hello\n')
conn.close()
gc.collect()
self.assertWriteToClosedFileRaises(conn)
finally:
listener.close()
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(('127.0.0.1', 0))
server.listen(50)
killer = eventlet.spawn(accept_once, server)
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(('127.0.0.1', server.getsockname()[1]))
fd = client.makefile('rb')
client.close()
assert fd.read() == b'hello\n'
assert fd.read() == b''
killer.wait()
def test_blocking_accept_mark_as_reopened(self):
evt_hub = get_hub()
with mock.patch.object(evt_hub, "mark_as_reopened") as patched_mark_as_reopened:
def connect_once(listener):
# delete/overwrite the original conn
# object, only keeping the file object around
# closing the file object should close everything
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(('127.0.0.1', listener.getsockname()[1]))
client.close()
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(('127.0.0.1', 0))
server.listen(50)
acceptlet = eventlet.spawn(connect_once, server)
conn, addr = server.accept()
conn.sendall(b'hello\n')
connfileno = conn.fileno()
conn.close()
assert patched_mark_as_reopened.called
assert patched_mark_as_reopened.call_count == 3, "3 fds were opened, but the hub was " \
"only notified {call_count} times" \
.format(call_count=patched_mark_as_reopened.call_count)
args, kwargs = patched_mark_as_reopened.call_args
assert args == (connfileno,), "Expected mark_as_reopened to be called " \
"with {expected_fileno}, but it was called " \
"with {fileno}".format(expected_fileno=connfileno,
fileno=args[0])
server.close()
def test_full_duplex(self):
large_data = b'*' * 10 * min_buf_size()
listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
listener.bind(('127.0.0.1', 0))
listener.listen(50)
bufsized(listener)
def send_large(sock):
sock.sendall(large_data)
def read_large(sock):
result = sock.recv(len(large_data))
while len(result) < len(large_data):
result += sock.recv(len(large_data))
self.assertEqual(result, large_data)
def server():
(sock, addr) = listener.accept()
sock = bufsized(sock)
send_large_coro = eventlet.spawn(send_large, sock)
eventlet.sleep(0)
result = sock.recv(10)
expected = b'hello world'
while len(result) < len(expected):
result += sock.recv(10)
self.assertEqual(result, expected)
send_large_coro.wait()
server_evt = eventlet.spawn(server)
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(('127.0.0.1', listener.getsockname()[1]))
bufsized(client)
large_evt = eventlet.spawn(read_large, client)
eventlet.sleep(0)
client.sendall(b'hello world')
server_evt.wait()
large_evt.wait()
client.close()
def test_timeout_and_final_write(self):
# This test verifies that a write on a socket that we've
# stopped listening for doesn't result in an incorrect switch
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(('127.0.0.1', 0))
server.listen(50)
bound_port = server.getsockname()[1]
def sender(evt):
s2, addr = server.accept()
wrap_wfile = s2.makefile('wb')
eventlet.sleep(0.02)
wrap_wfile.write(b'hi')
s2.close()
evt.send(b'sent via event')
evt = event.Event()
eventlet.spawn(sender, evt)
# lets the socket enter accept mode, which
# is necessary for connect to succeed on windows
eventlet.sleep(0)
try:
# try and get some data off of this pipe
# but bail before any is sent
eventlet.Timeout(0.01)
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(('127.0.0.1', bound_port))
wrap_rfile = client.makefile()
wrap_rfile.read(1)
self.fail()
except eventlet.Timeout:
pass
result = evt.wait()
self.assertEqual(result, b'sent via event')
server.close()
client.close()
def test_closure(self):
def spam_to_me(address):
sock = eventlet.connect(address)
while True:
try:
sock.sendall(b'hello world')
# Arbitrary delay to not use all available CPU, keeps the test
# running quickly and reliably under a second
time.sleep(0.001)
except socket.error as e:
if get_errno(e) == errno.EPIPE:
return
raise
server = eventlet.listen(('127.0.0.1', 0))
sender = eventlet.spawn(spam_to_me, server.getsockname())
client, address = server.accept()
server.close()
def reader():
try:
while True:
data = client.recv(1024)
assert data
# Arbitrary delay to not use all available CPU, keeps the test
# running quickly and reliably under a second
time.sleep(0.001)
except socket.error as e:
# we get an EBADF because client is closed in the same process
# (but a different greenthread)
if get_errno(e) != errno.EBADF:
raise
def closer():
client.close()
reader = eventlet.spawn(reader)
eventlet.spawn_n(closer)
reader.wait()
sender.wait()
def test_partial_write_295():
# https://github.com/eventlet/eventlet/issues/295
# `socket.makefile('w').writelines()` must send all
# despite partial writes by underlying socket
listen_socket = eventlet.listen(('localhost', 0))
original_accept = listen_socket.accept
def talk(conn):
f = conn.makefile('wb')
line = b'*' * 2140
f.writelines([line] * 10000)
conn.close()
def accept():
connection, address = original_accept()
original_send = connection.send
def slow_send(b, *args):
b = b[:1031]
return original_send(b, *args)
connection.send = slow_send
eventlet.spawn(talk, connection)
return connection, address
listen_socket.accept = accept
eventlet.spawn(listen_socket.accept)
sock = eventlet.connect(listen_socket.getsockname())
with eventlet.Timeout(10):
bs = sock.makefile('rb').read()
assert len(bs) == 21400000
assert bs == (b'*' * 21400000)