def get_image_data(self):
"""This function will:
1). export VDI as VHD stream;
2). make gzipped tarball from the VHD stream;
3). read from the tarball stream.and return the iterable data.
"""
tarpipe_out, tarpipe_in = utils.create_pipe()
pool = eventlet.GreenPool()
pool.spawn(self.start_image_stream_generator, tarpipe_in)
try:
while True:
data = tarpipe_out.read(CHUNK_SIZE)
if not data:
break
yield data
except Exception:
LOG.debug("Failed to read chunks from the tarfile "
"stream.")
raise
finally:
tarpipe_out.close()
pool.waitall()
python类GreenPool()的实例源码
def test_runtestmulti(self):
class MyConfig:
class MyOption:
numproc = 7
option = MyOption()
l = []
def MyGreenPool(**kw):
l.append(kw)
# Building a Detox object will already call GreenPool(),
# so we have to let MyGreenPool being called twice before raise
if len(l) == 2:
raise ValueError
from detox import proc
setattr(proc, 'GreenPool', MyGreenPool)
with pytest.raises(ValueError):
d = proc.Detox(MyConfig())
d.runtestsmulti(['env1', 'env2', 'env3']) # Fake env list
assert l[0] == {} # When building Detox object
assert l[1] == {'size': 7} # When calling runtestsmulti
def run_server(self):
"""Run a WSGI server."""
eventlet.wsgi.HttpProtocol.default_request_version = "HTTP/1.0"
eventlet.hubs.use_hub('poll')
eventlet.patcher.monkey_patch(all=False, socket=True)
self.pool = eventlet.GreenPool(size=self.threads)
socket_timeout = cfg.CONF.eventlet_opts.client_socket_timeout or None
try:
eventlet.wsgi.server(
self.sock, self.application,
custom_pool=self.pool,
url_length_limit=URL_LENGTH_LIMIT,
log=self._wsgi_logger,
debug=cfg.CONF.debug,
keepalive=cfg.CONF.eventlet_opts.wsgi_keep_alive,
socket_timeout=socket_timeout)
except socket.error as err:
if err[0] != errno.EINVAL:
raise
self.pool.waitall()
def __init__(self,server_addr=None,connect_cb=None,disconnect_cb=None,voxel_update_cb=None,avatar_pos_cb=None):
""" server_addr is a tuple of (ip,port) - this should usually be something on localhost for security reasons
connect_cb and disconnect_cb are callback functions that will be invoked upon successful connect/disconnect - they have no params
voxel_update_cb is called when a visible voxel is updated and is passed a voxel object as the only parameter
avatar_pos_cb is called when the AI avatar moves and is passed a tuple representing the new coordinates
"""
self.server_addr = server_addr
self.connected = False
self.ready = False
self.connect_id = None
self.connect_cb = connect_cb
self.disconnect_cb = disconnect_cb
self.voxel_update_cb = voxel_update_cb
self.avatar_pos_cb = avatar_pos_cb
self.avatar_pos = None
self.pool = eventlet.GreenPool(1000)
self.handlers = {MSGTYPE_CONNECT_ACK: self.handle_connect_ack,
MSGTYPE_VISUAL_RANGE: self.handle_visual_range,
MSGTYPE_VOXEL_UPDATE: self.handle_voxel_update,
MSGTYPE_BULK_VOXEL_UPDATE: self.handle_bulk_voxel,
MSGTYPE_AVATAR_POS: self.handle_avatar_pos}
self.sock = yatesock.YATESocket(handlers=self.handlers)
self.visual_range = None
if self.server_addr != None: self.connect_to(self.server_addr)
def __init__(self, host, client_factory, vppf, physnets):
self.host = host
self.client_factory = client_factory
self.vppf = vppf
self.physnets = physnets
self.pool = eventlet.GreenPool()
self.secgroup_enabled = cfg.CONF.SECURITYGROUP.enable_security_group
# These data structures are used as readiness indicators.
# A port is only in here only if the attachment part of binding
# has completed.
# key: if index in VPP; value: (ID, bound-callback, vpp-prop-dict)
self.iface_state = {}
# Members of this are ports requiring security groups with unsatisfied
# requirements.
self.iface_awaiting_secgroups = {}
# We also need to know if the vhostuser interface has seen a socket
# connection: this tells us there's a state change, and there is
# a state detection function on self.vppf.
self.vppf.vhost_ready_callback = self._vhost_ready
def producer(start_url):
"""Recursively crawl starting from *start_url*. Returns a set of
urls that were found."""
pool = eventlet.GreenPool()
seen = set()
q = eventlet.Queue()
q.put(start_url)
# keep looping if there are new urls, or workers that may produce more urls
while True:
while not q.empty():
url = q.get()
# limit requests to eventlet.net so we don't crash all over the internet
if url not in seen and 'eventlet.net' in url:
seen.add(url)
pool.spawn_n(fetch, url, q)
pool.waitall()
if q.empty():
break
return seen
def test_create_contention(self):
creates = [0]
def sleep_create():
creates[0] += 1
eventlet.sleep()
return "slept"
p = pools.Pool(max_size=4, create=sleep_create)
def do_get():
x = p.get()
self.assertEqual(x, "slept")
p.put(x)
gp = eventlet.GreenPool()
for i in six.moves.range(100):
gp.spawn_n(do_get)
gp.waitall()
self.assertEqual(creates[0], 4)
def test_no_leaking(self):
refs = weakref.WeakKeyDictionary()
my_local = corolocal.local()
class X(object):
pass
def do_something(i):
o = X()
refs[o] = True
my_local.foo = o
p = eventlet.GreenPool()
for i in six.moves.range(100):
p.spawn(do_something, i)
p.waitall()
del p
gc.collect()
eventlet.sleep(0)
gc.collect()
# at this point all our coros have terminated
self.assertEqual(len(refs), 1)
def test_close_idle(self):
pool = eventlet.GreenPool()
# use log=stderr when test runner can capture it
self.spawn_server(custom_pool=pool, log=sys.stdout)
connect = (
'GET /echo HTTP/1.1',
'Upgrade: WebSocket',
'Connection: Upgrade',
'Host: %s:%s' % self.server_addr,
'Origin: http://%s:%s' % self.server_addr,
'Sec-WebSocket-Protocol: ws',
'Sec-WebSocket-Key1: 4 @1 46546xW%0l 1 5',
'Sec-WebSocket-Key2: 12998 5 Y3 1 .P00',
)
sock = eventlet.connect(self.server_addr)
sock.sendall(six.b('\r\n'.join(connect) + '\r\n\r\n^n:ds[4U'))
sock.recv(1024)
sock.sendall(b'\x00hello\xff')
result = sock.recv(1024)
assert result, b'\x00hello\xff'
self.killer.kill(KeyboardInterrupt)
with eventlet.Timeout(1):
pool.waitall()
def test_close_idle_connections(self):
self.reset_timeout(2)
pool = eventlet.GreenPool()
self.spawn_server(custom_pool=pool)
# https://github.com/eventlet/eventlet/issues/188
sock = eventlet.connect(self.server_addr)
sock.sendall(b'GET / HTTP/1.1\r\nHost: localhost\r\n\r\n')
result = read_http(sock)
assert result.status == 'HTTP/1.1 200 OK', 'Received status {0!r}'.format(result.status)
self.killer.kill(KeyboardInterrupt)
try:
with eventlet.Timeout(1):
pool.waitall()
except Exception:
assert False, self.logfile.getvalue()
def test_multiple_coros(self):
evt = eventlet.Event()
results = []
def producer():
results.append('prod')
evt.send()
def consumer():
results.append('cons1')
evt.wait()
results.append('cons2')
pool = eventlet.GreenPool(2)
done = pool.spawn(consumer)
pool.spawn_n(producer)
done.wait()
self.assertEqual(['cons1', 'prod', 'cons2'], results)
def test_reentrant(self):
pool = eventlet.GreenPool(1)
def reenter():
waiter = pool.spawn(lambda a: a, 'reenter')
self.assertEqual('reenter', waiter.wait())
outer_waiter = pool.spawn(reenter)
outer_waiter.wait()
evt = eventlet.Event()
def reenter_async():
pool.spawn_n(lambda a: a, 'reenter')
evt.send('done')
pool.spawn_n(reenter_async)
self.assertEqual('done', evt.wait())
def test_imap_raises(self):
# testing the case where the function raises an exception;
# both that the caller sees that exception, and that the iterator
# continues to be usable to get the rest of the items
p = eventlet.GreenPool(4)
def raiser(item):
if item == 1 or item == 7:
raise RuntimeError("intentional error")
else:
return item
it = p.imap(raiser, range(10))
results = []
while True:
try:
results.append(six.next(it))
except RuntimeError:
results.append('r')
except StopIteration:
break
self.assertEqual(results, [0, 'r', 2, 3, 4, 5, 6, 'r', 8, 9])
def test_with_intpool(self):
class IntPool(pools.Pool):
def create(self):
self.current_integer = getattr(self, 'current_integer', 0) + 1
return self.current_integer
def subtest(intpool_size, pool_size, num_executes):
def run(int_pool):
token = int_pool.get()
eventlet.sleep(0.0001)
int_pool.put(token)
return token
int_pool = IntPool(max_size=intpool_size)
pool = eventlet.GreenPool(pool_size)
for ix in six.moves.range(num_executes):
pool.spawn(run, int_pool)
pool.waitall()
subtest(4, 7, 7)
subtest(50, 75, 100)
for isize in (10, 20, 30, 40, 50):
for psize in (5, 25, 35, 50):
subtest(isize, psize, psize)
def get_asynchronous_eventlet_pool(size=1000):
"""Return eventlet pool to caller.
Also store pools created in global list, to wait on
it after getting signal for graceful shutdown.
:param size: eventlet pool size
:returns: eventlet pool
"""
global ASYNC_EVENTLET_THREAD_POOL_LIST
pool = eventlet.GreenPool(size=size)
# Add pool to global ASYNC_EVENTLET_THREAD_POOL_LIST
ASYNC_EVENTLET_THREAD_POOL_LIST.append(pool)
return pool
def start(self):
# Start thread to generate tgz and write tgz data into tarpipe_in.
with tarfile.open(fileobj=self.tarpipe_in, mode='w|gz') as tar_file:
# only need export the leaf vdi.
vdi_uuid = self.vdi_uuids[0]
vdi_ref = self.session.VDI.get_by_uuid(vdi_uuid)
vhd_stream = self._connect_request(vdi_ref)
tar_info = tarfile.TarInfo('0.vhd')
try:
# the VHD must be dynamical hard disk, otherwise it will raise
# VhdDiskTypeNotSupported exception when parsing VDH file.
vhd_DynDisk = vhd_utils.VHDDynDiskParser(vhd_stream)
tar_info.size = vhd_DynDisk.get_vhd_file_size()
LOG.debug("VHD size for tarfile is %d" % tar_info.size)
vhdpipe_out, vhdpipe_in = utils.create_pipe()
pool = eventlet.GreenPool()
pool.spawn(self.convert_vhd_to_tar, vhdpipe_out,
tar_file, tar_info)
try:
self._vhd_to_pipe(vhd_DynDisk, vhdpipe_in)
finally:
vhdpipe_in.close()
pool.waitall()
finally:
self._clean()
def runtestsmulti(self, envlist):
pool = GreenPool(size=self._toxconfig.option.numproc)
for env in envlist:
pool.spawn_n(self.runtests, env)
pool.waitall()
if not self.toxsession.config.option.sdistonly:
retcode = self._toxsession._summary()
return retcode
def __init__(self, providerbase):
self._providerbase = providerbase
self._spec2thread = {}
self._pool = GreenPool()
self._resources = {}
def test_getresources_parallel(self):
l= []
queue = eventlet.Queue()
class Provider:
def provide_abc(self):
l.append(1)
return 42
resources = Resources(Provider())
pool = eventlet.GreenPool(2)
pool.spawn(lambda: resources.getresources("abc"))
pool.spawn(lambda: resources.getresources("abc"))
pool.waitall()
assert len(l) == 1
def start_wsgi(self):
if self.conf.workers == 0:
# Useful for profiling, test, debug etc.
self.pool = eventlet.GreenPool(size=self.threads)
self.pool.spawn_n(self._single_run, self.application, self.sock)
return
LOG.info(_LI("Starting %d workers") % self.conf.workers)
signal.signal(signal.SIGTERM, self.kill_children)
signal.signal(signal.SIGINT, self.kill_children)
signal.signal(signal.SIGHUP, self.hup)
while len(self.children) < self.conf.workers:
self.run_child()
def start_api_and_rpc_workers(self):
"""Initializes eventlet and starts wait for workers to exit.
Spawns the workers returned from serve_rpc
"""
pool = eventlet.GreenPool()
quark_rpc = self.serve_rpc()
pool.spawn(quark_rpc.wait)
pool.waitall()
def __init__(self,scr):
self.scr = scr
curses.curs_set(0)
self.init_color_pairs()
curses.init_pair(TOPSTATUS,TOPSTATUS_FG,TOPSTATUS_BG)
self.scr.nodelay(1)
self.running = False
self.y,self.x = self.scr.getbegyx()
self.h,self.w = self.scr.getmaxyx()
self.av_pos = (0,0,0)
self.init_log()
self.init_voxel_display()
self.percept_delay = 0
self.cmdfuncs = {'help':self.helpfunc}
self.disp_func = self.log_display
self.client = yateclient.YATEClient(voxel_update_cb=self.voxel_update_cb,avatar_pos_cb=self.avatar_pos_cb)
self.running = True
yatelog.info('yate_console','Starting up')
self.draw_scr()
self.pool = eventlet.GreenPool(100)
self.pool.spawn(self.main_ui_loop)
while self.running: eventlet.greenthread.sleep(1)
curses.curs_set(1)
def __init__(self,driver,verbose=False):
self.logger = yatelog.get_logger()
if verbose: self.logger.setLevel(logging.DEBUG)
self.driver = driver
self.handlers = {MSGTYPE_REQUEST_POS: self.handle_request_pos,
MSGTYPE_REQUEST_RANGE: self.handle_request_range,
MSGTYPE_REQUEST_VOXEL: self.handle_request_voxel,
MSGTYPE_VISIBLE_VOXEL_REQ: self.handle_visible_voxel_req,
MSGTYPE_MOVE_VECTOR: self.handle_move_vector}
self.sock = yatesock.YATESocket(handlers=self.handlers)
self.pool = eventlet.GreenPool(1000)
self.pool.spawn(self.do_ticks)
self.pool.spawn(self.do_vis_updates)
def __init__(self,bind_ip='127.0.0.1',bind_port=0,handlers={},enable_null_handle=True):
""" handlers is a dict mapping message type integers to functions that take the params (msg_params,msg_id,from_addr,sock)
enable_null_handle enables a default "null handler" that does nothing with unhandled message types except logging them to debug
"""
self.sock = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
self.sock.bind((bind_ip,bind_port))
yatelog.info('YATESock','Bound %s:%s' % self.sock.getsockname())
yatelog.info('YATESock','Setting up handlers and queues')
self.pool = eventlet.GreenPool(1000)
self.in_queues = {} # packets coming in from remote peer go here after parsing, each message type has an independent queue so we can do QoS-type stuff
self.out_queues = {} # packets going out to remote peer go here
self.parse_q = eventlet.queue.LightQueue(0) # to keep up performance, packets go here before parsing
self.handlers = {MSGTYPE_CONNECT: self.handle_connect, # a couple of standard message handlers, override by passing in new handlers
MSGTYPE_UNKNOWN_PEER: self.handle_unknown_peer,
MSGTYPE_CONNECT_ACK: self.handle_connect_ack,
MSGTYPE_KEEPALIVE: self.handle_keepalive,
MSGTYPE_KEEPALIVE_ACK:self.handle_keepalive_ack}
self.handlers.update(handlers)
self.enable_null_handle = enable_null_handle
self.active = True
for x in xrange(10): self.pool.spawn_n(self.parser_thread)
for k,v in msgtype_str.items():
self.in_queues[k] = eventlet.queue.LightQueue(0)
self.out_queues[k] = eventlet.queue.LightQueue(0)
setattr(self,'send_%s' % v[8:].lower(),YATESockSendMethod(k,self)) # black magic
for x in xrange(2): self.pool.spawn_n(self.msg_sender_thread,k)
for x in xrange(2): self.pool.spawn_n(self.msg_reader_thread,k)
if enable_null_handle:
if not self.handlers.has_key(k): self.handlers[k] = self.null_handler
self.known_peers = set() # if this is a server, this set contains the list of clients, if it's a client this contains only 1 member - the server
self.last_pack = {} # store the timestamp of the last packet from a particular peer so we can do timeouts
self.pool.spawn_n(self.recv_thread)
self.pool.spawn_n(self.timeout_thread) # timeout peers all in a central location, giving plenty of time for them to send packets and not timeout
def __init__(self,endpoint=None,protocol_version=packets.default_protocol_version,protocol_mode=0,handlers={},display_name='YATEBot'):
""" endpoint is a tuple of (ip,port) or None - if None, use connect_to() later
protocol_version is the version of the minecraft protocol to use
protocol_mode should be 0 at start, but if you're a psycho you can of course set it to ANYTHING you want - think of the possibilities
handlers maps packet names to handlers that accept the packet data - it's up to the handler to decode the packet at present
display_name is what it sounds like
despite this thing being in eventlet, it's pretty much blocking - because notch owes me now, also it's a TCP socket and there's probably ordering issues
"""
self.endpoint = endpoint
self.protocol_version = protocol_version
self.protocol_mode = protocol_mode
self.display_name = display_name
self.compression_threshold = 0
self.compression_enabled = False
self.handlers = {'login_set_compression':self.handle_login_set_compression,
'keep_alive': self.handle_keep_alive,
'set_compression': self.handle_set_compression}
self.handlers.update(handlers)
self.cipher = crypto.Cipher()
self.pool = eventlet.GreenPool(1000)
self.ready = False
self.blocking_handlers = False # if set to True, packet handlers will be invoked by the reader thread
for k,v in packets.packet_idents.items():
if k[0]==self.protocol_version:
setattr(self,'send_%s' % k[3],MCSendMethod(k[3],self))
if endpoint != None:
self.connect_to(endpoint)
def crawl(start_url):
"""Recursively crawl starting from *start_url*. Returns a set of
urls that were found."""
pool = eventlet.GreenPool()
seen = set()
fetch(start_url, seen, pool)
pool.waitall()
return seen
def test_spawn(self):
p = eventlet.GreenPool(4)
waiters = []
for i in range(10):
waiters.append(p.spawn(passthru, i))
results = [waiter.wait() for waiter in waiters]
self.assertEqual(results, list(range(10)))
def test_spawn_n(self):
p = eventlet.GreenPool(4)
results_closure = []
def do_something(a):
eventlet.sleep(0.01)
results_closure.append(a)
for i in range(10):
p.spawn(do_something, i)
p.waitall()
self.assertEqual(results_closure, list(range(10)))
def test_waiting(self):
pool = eventlet.GreenPool(1)
done = eventlet.Event()
def consume():
done.wait()
def waiter(pool):
gt = pool.spawn(consume)
gt.wait()
waiters = []
self.assertEqual(pool.running(), 0)
waiters.append(eventlet.spawn(waiter, pool))
eventlet.sleep(0)
self.assertEqual(pool.waiting(), 0)
waiters.append(eventlet.spawn(waiter, pool))
eventlet.sleep(0)
self.assertEqual(pool.waiting(), 1)
waiters.append(eventlet.spawn(waiter, pool))
eventlet.sleep(0)
self.assertEqual(pool.waiting(), 2)
self.assertEqual(pool.running(), 1)
done.send(None)
for w in waiters:
w.wait()
self.assertEqual(pool.waiting(), 0)
self.assertEqual(pool.running(), 0)
def test_resize(self):
pool = eventlet.GreenPool(2)
evt = eventlet.Event()
def wait_long_time(e):
e.wait()
pool.spawn(wait_long_time, evt)
pool.spawn(wait_long_time, evt)
self.assertEqual(pool.free(), 0)
self.assertEqual(pool.running(), 2)
self.assert_pool_has_free(pool, 0)
# verify that the pool discards excess items put into it
pool.resize(1)
# cause the wait_long_time functions to return, which will
# trigger puts to the pool
evt.send(None)
eventlet.sleep(0)
eventlet.sleep(0)
self.assertEqual(pool.free(), 1)
self.assertEqual(pool.running(), 0)
self.assert_pool_has_free(pool, 1)
# resize larger and assert that there are more free items
pool.resize(2)
self.assertEqual(pool.free(), 2)
self.assertEqual(pool.running(), 0)
self.assert_pool_has_free(pool, 2)