def testBenchmark():
import time
def printThreadNum():
import gc
from greenlet import greenlet
objs = [obj for obj in gc.get_objects() if isinstance(obj, greenlet)]
print "Greenlets: %s" % len(objs)
printThreadNum()
test = TestNoblock()
s = time.time()
for i in range(3):
gevent.spawn(test.count, i + 1)
print "Created in %.3fs" % (time.time() - s)
printThreadNum()
time.sleep(5)
python类spawn()的实例源码
def run(self):
c = self.client
if not c.login():
log.error('login failed')
return
symbols_list = self.split(self.symbols, self.size)
size = int(math.ceil(1. * len(symbols_list) / self.core))
child_sl = self.split(symbols_list, size)
f = open(self.out, 'ab') if self.out else None
ps, gs = [], []
for i in range(self.core):
r, w = gipc.pipe()
g = gevent.spawn(self.main_on_data, r, f)
p = gipc.start_process(target=self.spawn_watchs, args=(w, child_sl[i]))
ps.append(p)
for p in ps:
p.join()
for g in gs:
g.kill()
g.join()
def gevent_queue(q,msg_queue):
while True:
try:
msg = msg_queue.get(block=True)
log.debug("PID:%d gevent queue start---------------------->" % os.getpid())
if TEST_PROCESS_NUM > 1 and msg == "OK":
for i in range(TEST_PROCESS_NUM-1):
msg_queue.put(os.getpid())
log.debug("PID:%d gevent queue call other processes----" % os.getpid())
glist = []
for i in range(GEVENT_NUM):
glist.append(gevent.spawn(verify_ip_in_queues,q))
gevent.joinall(glist)
l = msg_queue.qsize()
for i in range(l):
msg_queue.get()
log.debug("PID:%d gevent queue end<----------------------" % os.getpid())
except Exception as e:
log.error("PID:%d gevent_queue error:%s" % (os.getpid(),e.message))
def start(self):
"""Start the Engine greenlets.
Returns:
[Greenlet]: A list of Greenlets to be joined
"""
if self._started:
return []
self._started = True
self._leaser_loop = gevent.spawn(self.lease.acquire)
self._runner_loop = gevent.spawn(self._run)
self._machine_loop = gevent.spawn(self._machine_heartbeat)
return [self._machine_loop, self._runner_loop, self._leaser_loop]
def fetch_multiple_urls_async(req_data):
start_time = time_ms()
# start the threads (greenlets)
threads_ = []
for u in req_data:
new_thread = gevent.spawn(fetch_url_async, u)
threads_.append(new_thread)
# wait for threads to finish
gevent.joinall(threads_)
# retrieve threads return values
results = []
for t in threads_:
rresult = t.get(block=True, timeout=6.0)
rresult['start_time'] = start_time
results.append(rresult)
return results
# process a batch of responses
def connect(self):
self.log("Connecting...")
self.type = "out"
if self.ip.endswith(".onion"):
if not self.server.tor_manager or not self.server.tor_manager.enabled:
raise Exception("Can't connect to onion addresses, no Tor controller present")
self.sock = self.server.tor_manager.createSocket(self.ip, self.port)
else:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((self.ip, int(self.port)))
# Implicit SSL
if self.cert_pin:
self.sock = CryptConnection.manager.wrapSocket(self.sock, "tls-rsa", cert_pin=self.cert_pin)
self.sock.do_handshake()
self.crypt = "tls-rsa"
self.sock_wrapped = True
# Detect protocol
self.send({"cmd": "handshake", "req_id": 0, "params": self.getHandshakeInfo()})
event_connected = self.event_connected
gevent.spawn(self.messageLoop)
return event_connected.get() # Wait for handshake
# Handle incoming connection
def testBlocking(self):
obj1 = ExampleClass()
obj2 = ExampleClass()
# Dont allow to call again until its running and wait until its running
threads = [
gevent.spawn(obj1.countBlocking),
gevent.spawn(obj1.countBlocking),
gevent.spawn(obj1.countBlocking),
gevent.spawn(obj2.countBlocking)
]
assert obj2.countBlocking() == "counted:5" # The call is ignored as obj2.countBlocking already counting, but block until its finishes
gevent.joinall(threads)
assert [thread.value for thread in threads] == ["counted:5","counted:5","counted:5","counted:5"] # Check the return value for every call
obj2.countBlocking() # Allow to call again as obj2.countBlocking finished
assert obj1.counted == 5
assert obj2.counted == 10
def updater(self, peers_try, queried, since):
while 1:
if not peers_try or len(queried) >= 3: # Stop after 3 successful query
break
peer = peers_try.pop(0)
if not peer.connection and len(queried) < 2:
peer.connect() # Only open new connection if less than 2 queried already
if not peer.connection or peer.connection.handshake.get("rev", 0) < 126:
continue # Not compatible
res = peer.listModified(since)
if not res or "modified_files" not in res:
continue # Failed query
queried.append(peer)
for inner_path, modified in res["modified_files"].iteritems(): # Check if the peer has newer files than we
content = self.content_manager.contents.get(inner_path)
if (not content or modified > content["modified"]) and inner_path not in self.bad_files:
self.log.debug("New modified file from %s: %s" % (peer, inner_path))
# We dont have this file or we have older
self.bad_files[inner_path] = self.bad_files.get(inner_path, 0) + 1 # Mark as bad file
gevent.spawn(self.downloadContent, inner_path) # Download the content.json + the changed files
# Check modified content.json files from peers and add modified files to bad_files
# Return: Successfully queried peers [Peer, Peer...]
def callAsync(event, allowed_again=10, func=None, *args, **kwargs):
if isAllowed(event, allowed_again): # Not called recently, call it now
called(event)
# print "Calling now"
return gevent.spawn(func, *args, **kwargs)
else: # Called recently, schedule it for later
time_left = allowed_again - max(0, time.time() - called_db[event])
log.debug("Added to queue (%.2fs left): %s " % (time_left, event))
if not queue_db.get(event): # Function call not queued yet
thread = gevent.spawn_later(time_left, lambda: callQueue(event)) # Call this function later
queue_db[event] = (func, args, kwargs, thread)
return thread
else: # Function call already queued, just update the parameters
thread = queue_db[event][3]
queue_db[event] = (func, args, kwargs, thread)
return thread
# Rate limit and delay function call if needed
# Return: Wait for execution/delay then return value
def testBlocking():
test = Test()
test2 = Test()
print "Counting..."
print "Creating class1/thread1"
thread1 = gevent.spawn(test.count)
print "Creating class1/thread2 (ignored)"
thread2 = gevent.spawn(test.count)
print "Creating class2/thread3"
thread3 = gevent.spawn(test2.count)
print "Joining class1/thread1"
thread1.join()
print "Joining class1/thread2"
thread2.join()
print "Joining class2/thread3"
thread3.join()
print "Creating class1/thread4 (its finished, allowed again)"
thread4 = gevent.spawn(test.count)
print "Joining thread4"
thread4.join()
print thread1.value, thread2.value, thread3.value, thread4.value
print "Done."
def main(self):
logging.info("Version: %s r%s, Python %s, Gevent: %s" % (config.version, config.rev, sys.version, gevent.__version__))
global ui_server, file_server
from File import FileServer
from Ui import UiServer
logging.info("Creating FileServer....")
file_server = FileServer()
logging.info("Creating UiServer....")
ui_server = UiServer()
logging.info("Removing old SSL certs...")
from Crypt import CryptConnection
CryptConnection.manager.removeCerts()
logging.info("Starting servers....")
gevent.joinall([gevent.spawn(ui_server.start), gevent.spawn(file_server.start)])
# Site commands
def checkSites(self):
if self.port_opened is None: # Test and open port if not tested yet
if len(self.sites) <= 2: # Faster announce on first startup
for address, site in self.sites.items():
gevent.spawn(self.checkSite, site)
self.openport()
if not self.port_opened:
self.tor_manager.startOnions()
self.log.debug("Checking sites integrity..")
for address, site in self.sites.items(): # Check sites integrity
gevent.spawn(self.checkSite, site) # Check in new thread
time.sleep(2) # Prevent too quick request
site = None
# Announce sites every 20 min
def start(self, check_sites=True):
self.sites = SiteManager.site_manager.list()
self.log = logging.getLogger("FileServer")
if config.debug:
# Auto reload FileRequest on change
from Debug import DebugReloader
DebugReloader(self.reload)
if check_sites: # Open port, Update sites, Check files integrity
gevent.spawn(self.checkSites)
thread_announce_sites = gevent.spawn(self.announceSites)
thread_wakeup_watcher = gevent.spawn(self.wakeupWatcher)
ConnectionServer.start(self)
# thread_wakeup_watcher.kill(exception=Debug.Notify("Stopping FileServer"))
# thread_announce_sites.kill(exception=Debug.Notify("Stopping FileServer"))
self.log.debug("Stopped.")
def test_lock_out_of_context_single(self):
r, w = pipe()
g = gevent.spawn(lambda r: r.get(), r)
gevent.sleep(SHORTTIME)
with raises(GIPCLocked):
with r:
pass
# The context manager can't close `r`, as it is locked in `g`.
g.kill(block=False)
# Ensure killing via 'context switch', i.e. yield control to other
# coroutines (otherwise the subsequent close attempt will fail with
# `GIPCLocked` error).
gevent.sleep(-1)
# Close writer first. otherwise, `os.close(r._fd)` would block on Win.
w.close()
r.close()
def test_lock_out_of_context_pair(self):
with raises(GIPCLocked):
with pipe(True) as (h1, h2):
# Write more to pipe than pipe buffer can hold
# (makes `put` block when there is no reader).
# Buffer is quite large on Windows.
gw = gevent.spawn(lambda h: h.put(LONGERTHANBUFFER), h1)
gevent.sleep(SHORTTIME)
# Context manager tries to close h2 reader, h2 writer, and
# h1 writer first. Fails upon latter, must still close
# h1 reader after that.
assert not h1._writer._closed
assert h1._reader._closed
assert h2._writer._closed
assert h2._reader._closed
# Kill greenlet (free lock on h1 writer), close h1 writer.
gw.kill(block=False)
gevent.sleep(-1)
h1.close()
assert h1._writer._closed
def test_lock_out_of_context_pair_3(self):
with raises(GIPCLocked):
with pipe(True) as (h1, h2):
gr1 = gevent.spawn(lambda h: h.get(), h1)
gr2 = gevent.spawn(lambda h: h.get(), h2)
gevent.sleep(SHORTTIME)
# Context succeeds closing h2 writer, fails upon closing h2
# reader. Proceeds closing h1 writer, succeeds, closes h1
# reader and fails.
assert not h2._reader._closed
assert not h1._reader._closed
assert h2._writer._closed
assert h1._writer._closed
gr1.kill(block=False)
gr2.kill(block=False)
gevent.sleep(-1)
h2.close()
h1.close()
def test_lock_out_of_context_pair_4(self):
with raises(GIPCLocked):
with pipe(True) as (h1, h2):
# Write more to pipe than pipe buffer can hold
# (makes `put` block when there is no reader).
# Buffer is quite large on Windows.
gw1 = gevent.spawn(lambda h: h.put(LONGERTHANBUFFER), h1)
gw2 = gevent.spawn(lambda h: h.put(LONGERTHANBUFFER), h2)
gevent.sleep(SHORTTIME)
# Context fails closing h2 writer, succeeds upon closing h2
# reader. Proceeds closing h1 writer, fails, closes h1
# reader and succeeds.
assert h2._reader._closed
assert h1._reader._closed
assert not h2._writer._closed
assert not h1._writer._closed
gw1.kill(block=False)
gw2.kill(block=False)
gevent.sleep(-1)
h2.close()
h1.close()
def test_whatever_1(self):
"""
From a writing child, fire into the pipe. In a greenlet in the parent,
receive one of these messages and return it to the main greenlet.
Expect message retrieval (child process creation) within a certain
timeout interval. Terminate the child process after retrieval.
"""
with pipe() as (r, w):
def readgreenlet(reader):
with gevent.Timeout(SHORTTIME * 5, False) as t:
m = reader.get(timeout=t)
return m
p = start_process(usecase_child_a, args=(w, ))
# Wait for process to send first message:
r.get()
# Second message must be available immediately now.
g = gevent.spawn(readgreenlet, r)
m = r.get()
assert g.get() == "SPLASH"
p.terminate()
p.join()
assert p.exitcode == -signal.SIGTERM
def _notification_processor():
while True:
(zk_path, command, value, version, max_wait_in_secs,
watch_type, notification_timestamp) = _NOTIFICATION_EVENT_QUEUE.get()
if zk_path == "kill":
_kill("Restart via kill api")
# ignore all notifications with an older version
if _is_older_version(zk_path, version, notification_timestamp):
continue
# TODO: we need to deal with it if the number of spawned greenlets
# becomes an issue.
gevent.spawn(_process_notification, command, value, version,
max_wait_in_secs, watch_type, zk_path,
notification_timestamp)
def ipCheck(self):
while(True):
db = DBHelper()
ids = db.getIds()
spawns = []
if ids:
# print len(ids)
logging.info("[+] there are {0} ip in database".format(len(ids)))
for id in ids:
ip = db.getIp(id[0])
# print ip
spawns.append(gevent.spawn(self.inspectIp, ip))
if len(spawns) >= 500:
gevent.joinall(spawns)
spawns = []
gevent.joinall(spawns)
else:
logging.info("[+] no ip in database")
# print 'no ip in database'
logging.info("[+] sleep now")
# print 'sleep now'
time.sleep(config.CHECK_INTERVAL)
def test_gevent1(self):
"""????????????"""
def foo():
_log.info('Running in foo')
gevent.sleep(0)
_log.info('Explicit context switch to foo again')
def bar():
_log.info('Explicit context to bar')
gevent.sleep(0)
_log.info('Implicit context switch back to bar')
gevent.joinall([
gevent.spawn(foo),
gevent.spawn(bar),
])
def test_greenlet(self):
"""??????Greenlet????"""
class MyGreenlet(gevent.Greenlet):
def __init__(self, message, n):
super(MyGreenlet, self).__init__()
self.message = message
self.n = n
def _run(self):
print(self.message)
gevent.sleep(self.n)
g1 = MyGreenlet("Hi there111!", 1)
g1.start()
g2 = MyGreenlet("Hi there222!", 2)
g2.start()
gevent.joinall([g1, g2])
# def test_shutdown(self):
# def run_forever():
# _log.info('run_forever start..')
# gevent.sleep(1000)
# gevent.signal(signal.SIGQUIT, gevent.kill)
# thread = gevent.spawn(run_forever)
# thread.join()
def test_event(self):
"""????event???????????"""
evt = Event()
def setter():
'''After 3 seconds, wake all threads waiting on the value of evt'''
_log.info('A: Hey wait for me, I have to do something')
gevent.sleep(3)
_log.info("Ok, I'm done")
evt.set()
def waiter():
'''After 3 seconds the get call will unblock'''
_log.info("I'll wait for you")
evt.wait() # blocking
_log.info("It's about time")
gevent.joinall([
gevent.spawn(setter),
gevent.spawn(waiter),
gevent.spawn(waiter),
gevent.spawn(waiter),
gevent.spawn(waiter),
gevent.spawn(waiter)
])
def test_queue(self):
"""???????????Queue"""
task_queue = Queue()
def worker(name):
while not task_queue.empty():
task = task_queue.get()
_log.info('Worker %s got task %s' % (name, task))
gevent.sleep(0)
_log.info('Quitting time!')
def boss():
for i in xrange(1,25):
task_queue.put_nowait(i)
gevent.spawn(boss).join()
gevent.joinall([
gevent.spawn(worker, 'steve'),
gevent.spawn(worker, 'john'),
gevent.spawn(worker, 'nancy'),
])
def test_group(self):
def talk(msg):
for i in xrange(3):
print(msg)
g1 = gevent.spawn(talk, 'bar')
g2 = gevent.spawn(talk, 'foo')
g3 = gevent.spawn(talk, 'fizz')
group = Group()
group.add(g1)
group.add(g2)
group.join()
group.add(g3)
group.join()
def test_pool(self):
"""?????"""
class SocketPool(object):
def __init__(self):
self.pool = Pool(1000)
self.pool.start()
def listen(self, socket):
while True:
socket.recv()
def add_handler(self, socket):
if self.pool.full():
raise Exception("At maximum pool size")
else:
self.pool.spawn(self.listen, socket)
def shutdown(self):
self.pool.kill()
def test_local(self):
"""
??????
?????gevent?web???HTTP?????????????????gevent?
"""
stash = local()
def f1():
stash.x = 1
print(stash.x)
def f2():
stash.y = 2
print(stash.y)
try:
stash.x
except AttributeError:
print("x is not local to f2")
g1 = gevent.spawn(f1)
g2 = gevent.spawn(f2)
gevent.joinall([g1, g2])
def echo_worker(self):
""" The `echo_worker` works through the `self.received_transfers` queue and spawns
`self.on_transfer` greenlets for all not-yet-seen transfers. """
log.debug('echo worker', qsize=self.received_transfers.qsize())
while self.stop_signal is None:
if self.received_transfers.qsize() > 0:
transfer = self.received_transfers.get()
if transfer in self.seen_transfers:
log.debug(
'duplicate transfer ignored',
initiator=pex(transfer['initiator']),
amount=transfer['amount'],
identifier=transfer['identifier']
)
else:
self.seen_transfers.append(transfer)
self.greenlets.append(gevent.spawn(self.on_transfer, transfer))
else:
gevent.sleep(.5)
def handle_balance(self, state_change):
channel_address = state_change.channel_address
token_address = state_change.token_address
participant_address = state_change.participant_address
balance = state_change.balance
graph = self.raiden.token_to_channelgraph[token_address]
channel = graph.address_to_channel[channel_address]
channel.state_transition(state_change)
if channel.contract_balance == 0:
connection_manager = self.raiden.connection_manager_for_token(
token_address
)
gevent.spawn(
connection_manager.join_channel,
participant_address,
balance
)
def synchronous():
# ??????
from gevent.event import Event
evt = Event()
def setter():
print('A: Hey wait for me, I have to do something')
gevent.sleep(3)
print('Ok, I\'m done')
evt.set()
def waiter():
print('I\'ll wait for you')
evt.wait()
print('It\'s about time')
gevent.joinall([gevent.spawn(setter),
gevent.spawn(waiter),
gevent.spawn(waiter),
gevent.spawn(waiter),
gevent.spawn(waiter)
])