def test_exhaustion(self):
waiter = Queue(0)
def consumer():
gotten = None
try:
gotten = self.pool.get()
finally:
waiter.put(gotten)
eventlet.spawn(consumer)
one, two, three, four = (
self.pool.get(), self.pool.get(), self.pool.get(), self.pool.get())
self.assertEqual(self.pool.free(), 0)
# Let consumer run; nothing will be in the pool, so he will wait
eventlet.sleep(0)
# Wake consumer
self.pool.put(one)
# wait for the consumer
self.assertEqual(waiter.get(), one)
python类sleep()的实例源码
def test_create_contention(self):
creates = [0]
def sleep_create():
creates[0] += 1
eventlet.sleep()
return "slept"
p = pools.Pool(max_size=4, create=sleep_create)
def do_get():
x = p.get()
self.assertEqual(x, "slept")
p.put(x)
gp = eventlet.GreenPool()
for i in six.moves.range(100):
gp.spawn_n(do_get)
gp.waitall()
self.assertEqual(creates[0], 4)
def test_no_leaking(self):
refs = weakref.WeakKeyDictionary()
my_local = corolocal.local()
class X(object):
pass
def do_something(i):
o = X()
refs[o] = True
my_local.foo = o
p = eventlet.GreenPool()
for i in six.moves.range(100):
p.spawn(do_something, i)
p.waitall()
del p
gc.collect()
eventlet.sleep(0)
gc.collect()
# at this point all our coros have terminated
self.assertEqual(len(refs), 1)
def test_recv_into_type():
# make sure `_recv_loop` returns the correct value when `recv_meth` is of
# foo_into type (fills a buffer and returns number of bytes, not the data)
# Using threads like `test_recv_type` above.
threading = eventlet.patcher.original('threading')
addr = []
def server():
sock = eventlet.listen(('127.0.0.1', 0))
addr[:] = sock.getsockname()
eventlet.sleep(0.2)
server_thread = threading.Thread(target=server)
server_thread.start()
eventlet.sleep(0.1)
sock = eventlet.connect(tuple(addr))
buf = array.array('B', b' ')
res = sock.recv_into(buf, 1)
assert isinstance(res, int)
def handle(ws):
if ws.path == '/echo':
while True:
m = ws.wait()
if m is None:
break
ws.send(m)
elif ws.path == '/range':
for i in range(10):
ws.send("msg %d" % i)
eventlet.sleep(0.01)
elif ws.path == '/error':
# some random socket error that we shouldn't normally get
raise socket.error(errno.ENOTSOCK)
else:
ws.close()
def test_sending_messages_to_websocket_75(self):
connect = [
"GET /echo HTTP/1.1",
"Upgrade: WebSocket",
"Connection: Upgrade",
"Host: %s:%s" % self.server_addr,
"Origin: http://%s:%s" % self.server_addr,
"WebSocket-Protocol: ws",
]
sock = eventlet.connect(self.server_addr)
sock.sendall(six.b('\r\n'.join(connect) + '\r\n\r\n'))
sock.recv(1024)
sock.sendall(b'\x00hello\xFF')
result = sock.recv(1024)
self.assertEqual(result, b'\x00hello\xff')
sock.sendall(b'\x00start')
eventlet.sleep(0.001)
sock.sendall(b' end\xff')
result = sock.recv(1024)
self.assertEqual(result, b'\x00start end\xff')
sock.shutdown(socket.SHUT_RDWR)
sock.close()
eventlet.sleep(0.01)
def _test_multiple_waiters(self, exception):
evt = event.Event()
results = []
def wait_on_event(i_am_done):
evt.wait()
results.append(True)
i_am_done.send()
if exception:
raise Exception()
waiters = []
count = 5
for i in range(count):
waiters.append(event.Event())
eventlet.spawn_n(wait_on_event, waiters[-1])
eventlet.sleep() # allow spawns to start executing
evt.send()
for w in waiters:
w.wait()
self.assertEqual(len(results), count)
def test_semaphore_contention():
g_mutex = eventlet.Semaphore()
counts = [0, 0]
def worker(no):
while min(counts) < 200:
with g_mutex:
counts[no - 1] += 1
eventlet.sleep(0.001)
t1 = eventlet.spawn(worker, no=1)
t2 = eventlet.spawn(worker, no=2)
eventlet.sleep(0.5)
t1.kill()
t2.kill()
assert abs(counts[0] - counts[1]) < int(min(counts) * 0.1), counts
def chunked_fail_app(environ, start_response):
"""http://rhodesmill.org/brandon/2013/chunked-wsgi/
"""
headers = [('Content-Type', 'text/plain')]
start_response('200 OK', headers)
# We start streaming data just fine.
yield b"The dwarves of yore made mighty spells,"
yield b"While hammers fell like ringing bells"
# Then the back-end fails!
try:
1 / 0
except Exception:
start_response('500 Error', headers, sys.exc_info())
return
# So rest of the response data is not available.
yield b"In places deep, where dark things sleep,"
yield b"In hollow halls beneath the fells."
def test_025_accept_errors(self):
debug.hub_exceptions(True)
listener = greensocket.socket()
listener.bind(('localhost', 0))
# NOT calling listen, to trigger the error
with capture_stderr() as log:
self.spawn_server(sock=listener)
eventlet.sleep(0) # need to enter server loop
try:
eventlet.connect(self.server_addr)
self.fail("Didn't expect to connect")
except socket.error as exc:
self.assertEqual(support.get_errno(exc), errno.ECONNREFUSED)
log_content = log.getvalue()
assert 'Invalid argument' in log_content, log_content
debug.hub_exceptions(False)
def test_ipv6(self):
try:
sock = eventlet.listen(('::1', 0), family=socket.AF_INET6)
except (socket.gaierror, socket.error): # probably no ipv6
return
log = six.StringIO()
# first thing the server does is try to log the IP it's bound to
def run_server():
try:
wsgi.server(sock=sock, log=log, site=Site())
except ValueError:
log.write(b'broken')
self.spawn_thread(run_server)
logval = log.getvalue()
while not logval:
eventlet.sleep(0.0)
logval = log.getvalue()
if 'broked' in logval:
self.fail('WSGI server raised exception with ipv6 socket')
def test_001_trampoline_timeout(self):
server_sock = eventlet.listen(('127.0.0.1', 0))
bound_port = server_sock.getsockname()[1]
def server(sock):
client, addr = sock.accept()
eventlet.sleep(0.1)
server_evt = eventlet.spawn(server, server_sock)
eventlet.sleep(0)
try:
desc = eventlet.connect(('127.0.0.1', bound_port))
hubs.trampoline(desc, read=True, write=False, timeout=0.001)
except eventlet.Timeout:
pass # test passed
else:
assert False, "Didn't timeout"
server_evt.wait()
check_hub()
def test_max_size(self):
q = eventlet.Queue(2)
results = []
def putter(q):
q.put('a')
results.append('a')
q.put('b')
results.append('b')
q.put('c')
results.append('c')
gt = eventlet.spawn(putter, q)
eventlet.sleep(0)
self.assertEqual(results, ['a', 'b'])
self.assertEqual(q.get(), 'a')
eventlet.sleep(0)
self.assertEqual(results, ['a', 'b', 'c'])
self.assertEqual(q.get(), 'b')
self.assertEqual(q.get(), 'c')
gt.wait()
def test_zero_max_size(self):
q = eventlet.Queue(0)
def sender(evt, q):
q.put('hi')
evt.send('done')
def receiver(q):
x = q.get()
return x
evt = event.Event()
gt = eventlet.spawn(sender, evt, q)
eventlet.sleep(0)
assert not evt.ready()
gt2 = eventlet.spawn(receiver, q)
self.assertEqual(gt2.wait(), 'hi')
self.assertEqual(evt.wait(), 'done')
gt.wait()
def test_multiple_waiters(self):
# tests that multiple waiters get their results back
q = eventlet.Queue()
sendings = ['1', '2', '3', '4']
gts = [eventlet.spawn(q.get) for x in sendings]
eventlet.sleep(0.01) # get 'em all waiting
q.put(sendings[0])
q.put(sendings[1])
q.put(sendings[2])
q.put(sendings[3])
results = set()
for i, gt in enumerate(gts):
results.add(gt.wait())
self.assertEqual(results, set(sendings))
def test_channel_waiters(self):
c = eventlet.Queue(0)
w1 = eventlet.spawn(c.get)
w2 = eventlet.spawn(c.get)
w3 = eventlet.spawn(c.get)
eventlet.sleep(0)
self.assertEqual(c.getting(), 3)
s1 = eventlet.spawn(c.put, 1)
s2 = eventlet.spawn(c.put, 2)
s3 = eventlet.spawn(c.put, 3)
s1.wait()
s2.wait()
s3.wait()
self.assertEqual(c.getting(), 0)
# NOTE: we don't guarantee that waiters are served in order
results = sorted([w1.wait(), w2.wait(), w3.wait()])
self.assertEqual(results, [1, 2, 3])
def test_get_nowait_unlock(self):
hub = hubs.get_hub()
result = []
q = eventlet.Queue(0)
p = eventlet.spawn(q.put, 5)
assert q.empty(), q
assert q.full(), q
eventlet.sleep(0)
assert q.empty(), q
assert q.full(), q
hub.schedule_call_global(0, store_result, result, q.get_nowait)
eventlet.sleep(0)
assert q.empty(), q
assert q.full(), q
assert result == [5], result
# TODO add ready to greenthread
# assert p.ready(), p
assert p.dead, p
assert q.empty(), q
# put_nowait must work from the mainloop
def test_put_nowait_unlock(self):
hub = hubs.get_hub()
result = []
q = eventlet.Queue(0)
eventlet.spawn(q.get)
assert q.empty(), q
assert q.full(), q
eventlet.sleep(0)
assert q.empty(), q
assert q.full(), q
hub.schedule_call_global(0, store_result, result, q.put_nowait, 10)
# TODO ready method on greenthread
# assert not p.ready(), p
eventlet.sleep(0)
assert result == [None], result
# TODO ready method
# assert p.ready(), p
assert q.full(), q
assert q.empty(), q
def handle(ws):
if ws.path == '/echo':
while True:
m = ws.wait()
if m is None:
break
ws.send(m)
elif ws.path == '/range':
for i in range(10):
ws.send("msg %d" % i)
eventlet.sleep(0.01)
elif ws.path == '/error':
# some random socket error that we shouldn't normally get
raise socket.error(errno.ENOTSOCK)
else:
ws.close()
def test_send_recv_13(self):
connect = [
"GET /echo HTTP/1.1",
"Upgrade: websocket",
"Connection: Upgrade",
"Host: %s:%s" % self.server_addr,
"Origin: http://%s:%s" % self.server_addr,
"Sec-WebSocket-Version: 13",
"Sec-WebSocket-Key: d9MXuOzlVQ0h+qRllvSCIg==",
]
sock = eventlet.connect(self.server_addr)
sock.sendall(six.b('\r\n'.join(connect) + '\r\n\r\n'))
sock.recv(1024)
ws = websocket.RFC6455WebSocket(sock, {}, client=True)
ws.send(b'hello')
assert ws.wait() == b'hello'
ws.send(b'hello world!\x01')
ws.send(u'hello world again!')
assert ws.wait() == b'hello world!\x01'
assert ws.wait() == u'hello world again!'
ws.close()
eventlet.sleep(0.01)