def parent(signal_path, pid):
eventlet.Timeout(5)
port = None
while True:
try:
contents = open(signal_path, 'rb').read()
port = int(contents.strip())
break
except Exception:
eventlet.sleep(0.1)
eventlet.connect(('127.0.0.1', port))
while True:
try:
contents = open(signal_path, 'rb').read()
result = contents.split()[1]
break
except Exception:
eventlet.sleep(0.1)
assert result == b'done', repr(result)
print('pass')
python类connect()的实例源码
def test_hub_exceptions(self):
debug.hub_exceptions(True)
server = eventlet.listen(('0.0.0.0', 0))
client = eventlet.connect(('127.0.0.1', server.getsockname()[1]))
client_2, addr = server.accept()
def hurl(s):
s.recv(1)
{}[1] # keyerror
with capture_stderr() as fake:
gt = eventlet.spawn(hurl, client_2)
eventlet.sleep(0)
client.send(b' ')
eventlet.sleep(0)
# allow the "hurl" greenlet to trigger the KeyError
# not sure why the extra context switch is needed
eventlet.sleep(0)
self.assertRaises(KeyError, gt.wait)
debug.hub_exceptions(False)
# look for the KeyError exception in the traceback
assert 'KeyError: 1' in fake.getvalue(), "Traceback not in:\n" + fake.getvalue()
def test_raised_multiple_readers(self):
debug.hub_prevent_multiple_readers(True)
def handle(sock, addr):
sock.recv(1)
sock.sendall(b"a")
raise eventlet.StopServe()
listener = eventlet.listen(('127.0.0.1', 0))
eventlet.spawn(eventlet.serve, listener, handle)
def reader(s):
s.recv(1)
s = eventlet.connect(('127.0.0.1', listener.getsockname()[1]))
a = eventlet.spawn(reader, s)
eventlet.sleep(0)
self.assertRaises(RuntimeError, s.recv, 1)
s.sendall(b'b')
a.wait()
def test_zero_timeout_and_back(self):
listen = eventlet.listen(('', 0))
# Keep reference to server side of socket
server = eventlet.spawn(listen.accept)
client = eventlet.connect(listen.getsockname())
client.settimeout(0.05)
# Now must raise socket.timeout
self.assertRaises(socket.timeout, client.recv, 1)
client.settimeout(0)
# Now must raise socket.error with EAGAIN
try:
client.recv(1)
assert False
except socket.error as e:
assert get_errno(e) == errno.EAGAIN
client.settimeout(0.05)
# Now socket.timeout again
self.assertRaises(socket.timeout, client.recv, 1)
server.wait()
def test_socket_file_read_non_int():
listen_socket = eventlet.listen(('localhost', 0))
def server():
conn, _ = listen_socket.accept()
conn.recv(1)
conn.sendall(b'response')
conn.close()
eventlet.spawn(server)
sock = eventlet.connect(listen_socket.getsockname())
fd = sock.makefile('rwb')
fd.write(b'?')
fd.flush()
with eventlet.Timeout(1):
try:
fd.read("This shouldn't work")
assert False
except TypeError:
pass
def test_recv_type():
# https://github.com/eventlet/eventlet/issues/245
# socket recv returning multiple data types
# For this test to work, client and server have to be in separate
# processes or OS threads. Just running two greenthreads gives
# false test pass.
threading = eventlet.patcher.original('threading')
addr = []
def server():
sock = eventlet.listen(('127.0.0.1', 0))
addr[:] = sock.getsockname()
eventlet.sleep(0.2)
server_thread = threading.Thread(target=server)
server_thread.start()
eventlet.sleep(0.1)
sock = eventlet.connect(tuple(addr))
s = sock.recv(1)
assert isinstance(s, bytes)
def test_correct_upgrade_request_75(self):
connect = [
"GET /echo HTTP/1.1",
"Upgrade: WebSocket",
"Connection: Upgrade",
"Host: %s:%s" % self.server_addr,
"Origin: http://%s:%s" % self.server_addr,
"WebSocket-Protocol: ws",
]
sock = eventlet.connect(self.server_addr)
sock.sendall(six.b('\r\n'.join(connect) + '\r\n\r\n'))
result = sock.recv(1024)
# The server responds the correct Websocket handshake
self.assertEqual(result, six.b('\r\n'.join([
'HTTP/1.1 101 Web Socket Protocol Handshake',
'Upgrade: WebSocket',
'Connection: Upgrade',
'WebSocket-Origin: http://%s:%s' % self.server_addr,
'WebSocket-Location: ws://%s:%s/echo\r\n\r\n' % self.server_addr,
])))
def test_correct_upgrade_request_76(self):
connect = [
"GET /echo HTTP/1.1",
"Upgrade: WebSocket",
"Connection: Upgrade",
"Host: %s:%s" % self.server_addr,
"Origin: http://%s:%s" % self.server_addr,
"Sec-WebSocket-Protocol: ws",
"Sec-WebSocket-Key1: 4 @1 46546xW%0l 1 5",
"Sec-WebSocket-Key2: 12998 5 Y3 1 .P00",
]
sock = eventlet.connect(self.server_addr)
sock.sendall(six.b('\r\n'.join(connect) + '\r\n\r\n^n:ds[4U'))
result = sock.recv(1024)
# The server responds the correct Websocket handshake
self.assertEqual(result, six.b('\r\n'.join([
'HTTP/1.1 101 WebSocket Protocol Handshake',
'Upgrade: WebSocket',
'Connection: Upgrade',
'Sec-WebSocket-Origin: http://%s:%s' % self.server_addr,
'Sec-WebSocket-Protocol: ws',
'Sec-WebSocket-Location: ws://%s:%s/echo\r\n\r\n8jKS\'y:G*Co,Wxa-' % self.server_addr,
])))
def test_query_string(self):
# verify that the query string comes out the other side unscathed
connect = [
"GET /echo?query_string HTTP/1.1",
"Upgrade: WebSocket",
"Connection: Upgrade",
"Host: %s:%s" % self.server_addr,
"Origin: http://%s:%s" % self.server_addr,
"Sec-WebSocket-Protocol: ws",
"Sec-WebSocket-Key1: 4 @1 46546xW%0l 1 5",
"Sec-WebSocket-Key2: 12998 5 Y3 1 .P00",
]
sock = eventlet.connect(self.server_addr)
sock.sendall(six.b('\r\n'.join(connect) + '\r\n\r\n^n:ds[4U'))
result = sock.recv(1024)
self.assertEqual(result, six.b('\r\n'.join([
'HTTP/1.1 101 WebSocket Protocol Handshake',
'Upgrade: WebSocket',
'Connection: Upgrade',
'Sec-WebSocket-Origin: http://%s:%s' % self.server_addr,
'Sec-WebSocket-Protocol: ws',
'Sec-WebSocket-Location: '
'ws://%s:%s/echo?query_string\r\n\r\n8jKS\'y:G*Co,Wxa-' % self.server_addr,
])))
def test_empty_query_string(self):
# verify that a single trailing ? doesn't get nuked
connect = [
"GET /echo? HTTP/1.1",
"Upgrade: WebSocket",
"Connection: Upgrade",
"Host: %s:%s" % self.server_addr,
"Origin: http://%s:%s" % self.server_addr,
"Sec-WebSocket-Protocol: ws",
"Sec-WebSocket-Key1: 4 @1 46546xW%0l 1 5",
"Sec-WebSocket-Key2: 12998 5 Y3 1 .P00",
]
sock = eventlet.connect(self.server_addr)
sock.sendall(six.b('\r\n'.join(connect) + '\r\n\r\n^n:ds[4U'))
result = sock.recv(1024)
self.assertEqual(result, six.b('\r\n'.join([
'HTTP/1.1 101 WebSocket Protocol Handshake',
'Upgrade: WebSocket',
'Connection: Upgrade',
'Sec-WebSocket-Origin: http://%s:%s' % self.server_addr,
'Sec-WebSocket-Protocol: ws',
'Sec-WebSocket-Location: ws://%s:%s/echo?\r\n\r\n8jKS\'y:G*Co,Wxa-' % self.server_addr,
])))
def test_sending_messages_to_websocket_75(self):
connect = [
"GET /echo HTTP/1.1",
"Upgrade: WebSocket",
"Connection: Upgrade",
"Host: %s:%s" % self.server_addr,
"Origin: http://%s:%s" % self.server_addr,
"WebSocket-Protocol: ws",
]
sock = eventlet.connect(self.server_addr)
sock.sendall(six.b('\r\n'.join(connect) + '\r\n\r\n'))
sock.recv(1024)
sock.sendall(b'\x00hello\xFF')
result = sock.recv(1024)
self.assertEqual(result, b'\x00hello\xff')
sock.sendall(b'\x00start')
eventlet.sleep(0.001)
sock.sendall(b' end\xff')
result = sock.recv(1024)
self.assertEqual(result, b'\x00start end\xff')
sock.shutdown(socket.SHUT_RDWR)
sock.close()
eventlet.sleep(0.01)
def test_getting_messages_from_websocket_75(self):
connect = [
"GET /range HTTP/1.1",
"Upgrade: WebSocket",
"Connection: Upgrade",
"Host: %s:%s" % self.server_addr,
"Origin: http://%s:%s" % self.server_addr,
"WebSocket-Protocol: ws",
]
sock = eventlet.connect(self.server_addr)
sock.sendall(six.b('\r\n'.join(connect) + '\r\n\r\n'))
resp = sock.recv(1024)
headers, result = resp.split(b'\r\n\r\n')
msgs = [result.strip(b'\x00\xff')]
cnt = 10
while cnt:
msgs.append(sock.recv(20).strip(b'\x00\xff'))
cnt -= 1
# Last item in msgs is an empty string
self.assertEqual(msgs[:-1], [six.b('msg %d' % i) for i in range(10)])
def test_getting_messages_from_websocket_76(self):
connect = [
"GET /range HTTP/1.1",
"Upgrade: WebSocket",
"Connection: Upgrade",
"Host: %s:%s" % self.server_addr,
"Origin: http://%s:%s" % self.server_addr,
"Sec-WebSocket-Protocol: ws",
"Sec-WebSocket-Key1: 4 @1 46546xW%0l 1 5",
"Sec-WebSocket-Key2: 12998 5 Y3 1 .P00",
]
sock = eventlet.connect(self.server_addr)
sock.sendall(six.b('\r\n'.join(connect) + '\r\n\r\n^n:ds[4U'))
resp = sock.recv(1024)
headers, result = resp.split(b'\r\n\r\n')
msgs = [result[16:].strip(b'\x00\xff')]
cnt = 10
while cnt:
msgs.append(sock.recv(20).strip(b'\x00\xff'))
cnt -= 1
# Last item in msgs is an empty string
self.assertEqual(msgs[:-1], [six.b('msg %d' % i) for i in range(10)])
def test_server_closing_connect_76(self):
connect = [
"GET / HTTP/1.1",
"Upgrade: WebSocket",
"Connection: Upgrade",
"Host: %s:%s" % self.server_addr,
"Origin: http://%s:%s" % self.server_addr,
"Sec-WebSocket-Protocol: ws",
"Sec-WebSocket-Key1: 4 @1 46546xW%0l 1 5",
"Sec-WebSocket-Key2: 12998 5 Y3 1 .P00",
]
sock = eventlet.connect(self.server_addr)
sock.sendall(six.b('\r\n'.join(connect) + '\r\n\r\n^n:ds[4U'))
resp = sock.recv(1024)
headers, result = resp.split(b'\r\n\r\n')
# The remote server should have immediately closed the connection.
self.assertEqual(result[16:], b'\xff\x00')
def test_close_idle(self):
pool = eventlet.GreenPool()
# use log=stderr when test runner can capture it
self.spawn_server(custom_pool=pool, log=sys.stdout)
connect = (
'GET /echo HTTP/1.1',
'Upgrade: WebSocket',
'Connection: Upgrade',
'Host: %s:%s' % self.server_addr,
'Origin: http://%s:%s' % self.server_addr,
'Sec-WebSocket-Protocol: ws',
'Sec-WebSocket-Key1: 4 @1 46546xW%0l 1 5',
'Sec-WebSocket-Key2: 12998 5 Y3 1 .P00',
)
sock = eventlet.connect(self.server_addr)
sock.sendall(six.b('\r\n'.join(connect) + '\r\n\r\n^n:ds[4U'))
sock.recv(1024)
sock.sendall(b'\x00hello\xff')
result = sock.recv(1024)
assert result, b'\x00hello\xff'
self.killer.kill(KeyboardInterrupt)
with eventlet.Timeout(1):
pool.waitall()
def test_ssl_close(self):
def serve(listener):
sock, addr = listener.accept()
sock.recv(8192)
try:
self.assertEqual(b'', sock.recv(8192))
except greenio.SSL.ZeroReturnError:
pass
sock = listen_ssl_socket()
server_coro = eventlet.spawn(serve, sock)
raw_client = eventlet.connect(sock.getsockname())
client = ssl.wrap_socket(raw_client)
client.sendall(b'X')
greenio.shutdown_safe(client)
client.close()
server_coro.wait()
def test_ssl_unwrap(self):
def serve():
sock, addr = listener.accept()
self.assertEqual(sock.recv(6), b'before')
sock_ssl = ssl.wrap_socket(sock, tests.private_key_file, tests.certificate_file,
server_side=True)
sock_ssl.do_handshake()
self.assertEqual(sock_ssl.recv(6), b'during')
sock2 = sock_ssl.unwrap()
self.assertEqual(sock2.recv(5), b'after')
sock2.close()
listener = eventlet.listen(('127.0.0.1', 0))
server_coro = eventlet.spawn(serve)
client = eventlet.connect(listener.getsockname())
client.sendall(b'before')
client_ssl = ssl.wrap_socket(client)
client_ssl.do_handshake()
client_ssl.sendall(b'during')
client2 = client_ssl.unwrap()
client2.sendall(b'after')
server_coro.wait()
def spawn_server(self, **kwargs):
"""Spawns a new wsgi server with the given arguments using
:meth:`spawn_thread`.
Sets `self.server_addr` to (host, port) tuple suitable for `socket.connect`.
"""
self.logfile = six.StringIO()
new_kwargs = dict(max_size=128,
log=self.logfile,
site=self.site)
new_kwargs.update(kwargs)
if 'sock' not in new_kwargs:
new_kwargs['sock'] = eventlet.listen(('localhost', 0))
self.server_addr = new_kwargs['sock'].getsockname()
self.spawn_thread(wsgi.server, **new_kwargs)
def test_006_reject_long_urls(self):
sock = eventlet.connect(self.server_addr)
path_parts = []
for ii in range(3000):
path_parts.append('path')
path = '/'.join(path_parts)
request = 'GET /%s HTTP/1.0\r\nHost: localhost\r\n\r\n' % path
send_expect_close(sock, request.encode())
fd = sock.makefile('rb')
result = fd.readline()
if result:
# windows closes the socket before the data is flushed,
# so we never get anything back
status = result.split(b' ')[1]
self.assertEqual(status, b'414')
fd.close()
def test_007_get_arg(self):
# define a new handler that does a get_arg as well as a read_body
def new_app(env, start_response):
body = bytes_to_str(env['wsgi.input'].read())
a = cgi.parse_qs(body).get('a', [1])[0]
start_response('200 OK', [('Content-type', 'text/plain')])
return [six.b('a is %s, body is %s' % (a, body))]
self.site.application = new_app
sock = eventlet.connect(self.server_addr)
request = b'\r\n'.join((
b'POST / HTTP/1.0',
b'Host: localhost',
b'Content-Length: 3',
b'',
b'a=a'))
sock.sendall(request)
# send some junk after the actual request
sock.sendall(b'01234567890123456789')
result = read_http(sock)
self.assertEqual(result.body, b'a is a, body is a=a')
def test_012_ssl_server(self):
def wsgi_app(environ, start_response):
start_response('200 OK', {})
return [environ['wsgi.input'].read()]
certificate_file = os.path.join(os.path.dirname(__file__), 'test_server.crt')
private_key_file = os.path.join(os.path.dirname(__file__), 'test_server.key')
server_sock = eventlet.wrap_ssl(eventlet.listen(('localhost', 0)),
certfile=certificate_file,
keyfile=private_key_file,
server_side=True)
self.spawn_server(sock=server_sock, site=wsgi_app)
sock = eventlet.connect(self.server_addr)
sock = eventlet.wrap_ssl(sock)
sock.write(
b'POST /foo HTTP/1.1\r\nHost: localhost\r\n'
b'Connection: close\r\nContent-length:3\r\n\r\nabc')
result = recvall(sock)
assert result.endswith(b'abc')
def test_013_empty_return(self):
def wsgi_app(environ, start_response):
start_response("200 OK", [])
return [b""]
certificate_file = os.path.join(os.path.dirname(__file__), 'test_server.crt')
private_key_file = os.path.join(os.path.dirname(__file__), 'test_server.key')
server_sock = eventlet.wrap_ssl(eventlet.listen(('localhost', 0)),
certfile=certificate_file,
keyfile=private_key_file,
server_side=True)
self.spawn_server(sock=server_sock, site=wsgi_app)
sock = eventlet.connect(('localhost', server_sock.getsockname()[1]))
sock = eventlet.wrap_ssl(sock)
sock.write(b'GET /foo HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n')
result = recvall(sock)
assert result[-4:] == b'\r\n\r\n'
def test_016_repeated_content_length(self):
"""content-length header was being doubled up if it was set in
start_response and could also be inferred from the iterator
"""
def wsgi_app(environ, start_response):
start_response('200 OK', [('Content-Length', '7')])
return [b'testing']
self.site.application = wsgi_app
sock = eventlet.connect(self.server_addr)
fd = sock.makefile('rwb')
fd.write(b'GET /a HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n')
fd.flush()
header_lines = []
while True:
line = fd.readline()
if line == b'\r\n':
break
else:
header_lines.append(line)
self.assertEqual(1, len(
[l for l in header_lines if l.lower().startswith(b'content-length')]))
def test_018_http_10_keepalive(self):
# verify that if an http/1.0 client sends connection: keep-alive
# that we don't close the connection
sock = eventlet.connect(self.server_addr)
sock.sendall(b'GET / HTTP/1.0\r\nHost: localhost\r\nConnection: keep-alive\r\n\r\n')
result1 = read_http(sock)
assert 'connection' in result1.headers_lower
self.assertEqual('keep-alive', result1.headers_lower['connection'])
# repeat request to verify connection is actually still open
sock.sendall(b'GET / HTTP/1.0\r\nHost: localhost\r\nConnection: keep-alive\r\n\r\n')
result2 = read_http(sock)
assert 'connection' in result2.headers_lower
self.assertEqual('keep-alive', result2.headers_lower['connection'])
sock.close()
def test_019_fieldstorage_compat(self):
def use_fieldstorage(environ, start_response):
cgi.FieldStorage(fp=environ['wsgi.input'], environ=environ)
start_response('200 OK', [('Content-type', 'text/plain')])
return [b'hello!']
self.site.application = use_fieldstorage
sock = eventlet.connect(self.server_addr)
sock.sendall(b'POST / HTTP/1.1\r\n'
b'Host: localhost\r\n'
b'Connection: close\r\n'
b'Transfer-Encoding: chunked\r\n\r\n'
b'2\r\noh\r\n'
b'4\r\n hai\r\n0\r\n\r\n')
assert b'hello!' in recvall(sock)
def test_020_x_forwarded_for(self):
request_bytes = (
b'GET / HTTP/1.1\r\nHost: localhost\r\n'
+ b'X-Forwarded-For: 1.2.3.4, 5.6.7.8\r\n\r\n'
)
sock = eventlet.connect(self.server_addr)
sock.sendall(request_bytes)
sock.recv(1024)
sock.close()
assert '1.2.3.4,5.6.7.8,127.0.0.1' in self.logfile.getvalue()
# turning off the option should work too
self.logfile = six.StringIO()
self.spawn_server(log_x_forwarded_for=False)
sock = eventlet.connect(self.server_addr)
sock.sendall(request_bytes)
sock.recv(1024)
sock.close()
assert '1.2.3.4' not in self.logfile.getvalue()
assert '5.6.7.8' not in self.logfile.getvalue()
assert '127.0.0.1' in self.logfile.getvalue()
def test_021_environ_clobbering(self):
def clobberin_time(environ, start_response):
for environ_var in [
'wsgi.version', 'wsgi.url_scheme',
'wsgi.input', 'wsgi.errors', 'wsgi.multithread',
'wsgi.multiprocess', 'wsgi.run_once', 'REQUEST_METHOD',
'SCRIPT_NAME', 'RAW_PATH_INFO', 'PATH_INFO', 'QUERY_STRING',
'CONTENT_TYPE', 'CONTENT_LENGTH', 'SERVER_NAME', 'SERVER_PORT',
'SERVER_PROTOCOL']:
environ[environ_var] = None
start_response('200 OK', [('Content-type', 'text/plain')])
return []
self.site.application = clobberin_time
sock = eventlet.connect(self.server_addr)
sock.sendall(b'GET / HTTP/1.1\r\n'
b'Host: localhost\r\n'
b'Connection: close\r\n'
b'\r\n\r\n')
assert b'200 OK' in recvall(sock)
def test_023_bad_content_length(self):
sock = eventlet.connect(self.server_addr)
sock.sendall(b'GET / HTTP/1.0\r\nHost: localhost\r\nContent-length: argh\r\n\r\n')
result = recvall(sock)
assert result.startswith(b'HTTP'), result
assert b'400 Bad Request' in result, result
assert b'500' not in result, result
sock = eventlet.connect(self.server_addr)
sock.sendall(b'GET / HTTP/1.0\r\nHost: localhost\r\nContent-length:\r\n\r\n')
result = recvall(sock)
assert result.startswith(b'HTTP'), result
assert b'400 Bad Request' in result, result
assert b'500' not in result, result
sock = eventlet.connect(self.server_addr)
sock.sendall(b'GET / HTTP/1.0\r\nHost: localhost\r\nContent-length: \r\n\r\n')
result = recvall(sock)
assert result.startswith(b'HTTP'), result
assert b'400 Bad Request' in result, result
assert b'500' not in result, result
def test_error_in_chunked_closes_connection(self):
# From http://rhodesmill.org/brandon/2013/chunked-wsgi/
self.spawn_server(minimum_chunk_size=1)
self.site.application = chunked_fail_app
sock = eventlet.connect(self.server_addr)
sock.sendall(b'GET / HTTP/1.1\r\nHost: localhost\r\n\r\n')
result = read_http(sock)
self.assertEqual(result.status, 'HTTP/1.1 200 OK')
self.assertEqual(result.headers_lower.get('transfer-encoding'), 'chunked')
expected_body = (
b'27\r\nThe dwarves of yore made mighty spells,\r\n'
b'25\r\nWhile hammers fell like ringing bells\r\n')
self.assertEqual(result.body, expected_body)
# verify that socket is closed by server
self.assertEqual(sock.recv(1), b'')
def test_zero_length_chunked_response(self):
def zero_chunked_app(env, start_response):
start_response('200 OK', [('Content-type', 'text/plain')])
yield b""
self.site.application = zero_chunked_app
sock = eventlet.connect(self.server_addr)
sock.sendall(b'GET / HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n')
response = recvall(sock).split(b'\r\n')
headers = []
while True:
h = response.pop(0)
headers.append(h)
if h == b'':
break
assert b'Transfer-Encoding: chunked' in b''.join(headers), headers
# should only be one chunk of zero size with two blank lines
# (one terminates the chunk, one terminates the body)
self.assertEqual(response, [b'0', b'', b''])