def gevent_queue(q,msg_queue):
while True:
try:
msg = msg_queue.get(block=True)
log.debug("PID:%d gevent queue start---------------------->" % os.getpid())
if TEST_PROCESS_NUM > 1 and msg == "OK":
for i in range(TEST_PROCESS_NUM-1):
msg_queue.put(os.getpid())
log.debug("PID:%d gevent queue call other processes----" % os.getpid())
glist = []
for i in range(GEVENT_NUM):
glist.append(gevent.spawn(verify_ip_in_queues,q))
gevent.joinall(glist)
l = msg_queue.qsize()
for i in range(l):
msg_queue.get()
log.debug("PID:%d gevent queue end<----------------------" % os.getpid())
except Exception as e:
log.error("PID:%d gevent_queue error:%s" % (os.getpid(),e.message))
python类joinall()的实例源码
def fetch_multiple_urls_async(req_data):
start_time = time_ms()
# start the threads (greenlets)
threads_ = []
for u in req_data:
new_thread = gevent.spawn(fetch_url_async, u)
threads_.append(new_thread)
# wait for threads to finish
gevent.joinall(threads_)
# retrieve threads return values
results = []
for t in threads_:
rresult = t.get(block=True, timeout=6.0)
rresult['start_time'] = start_time
results.append(rresult)
return results
# process a batch of responses
def testBlocking(self):
obj1 = ExampleClass()
obj2 = ExampleClass()
# Dont allow to call again until its running and wait until its running
threads = [
gevent.spawn(obj1.countBlocking),
gevent.spawn(obj1.countBlocking),
gevent.spawn(obj1.countBlocking),
gevent.spawn(obj2.countBlocking)
]
assert obj2.countBlocking() == "counted:5" # The call is ignored as obj2.countBlocking already counting, but block until its finishes
gevent.joinall(threads)
assert [thread.value for thread in threads] == ["counted:5","counted:5","counted:5","counted:5"] # Check the return value for every call
obj2.countBlocking() # Allow to call again as obj2.countBlocking finished
assert obj1.counted == 5
assert obj2.counted == 10
def main(self):
logging.info("Version: %s r%s, Python %s, Gevent: %s" % (config.version, config.rev, sys.version, gevent.__version__))
global ui_server, file_server
from File import FileServer
from Ui import UiServer
logging.info("Creating FileServer....")
file_server = FileServer()
logging.info("Creating UiServer....")
ui_server = UiServer()
logging.info("Removing old SSL certs...")
from Crypt import CryptConnection
CryptConnection.manager.removeCerts()
logging.info("Starting servers....")
gevent.joinall([gevent.spawn(ui_server.start), gevent.spawn(file_server.start)])
# Site commands
def ipCheck(self):
while(True):
db = DBHelper()
ids = db.getIds()
spawns = []
if ids:
# print len(ids)
logging.info("[+] there are {0} ip in database".format(len(ids)))
for id in ids:
ip = db.getIp(id[0])
# print ip
spawns.append(gevent.spawn(self.inspectIp, ip))
if len(spawns) >= 500:
gevent.joinall(spawns)
spawns = []
gevent.joinall(spawns)
else:
logging.info("[+] no ip in database")
# print 'no ip in database'
logging.info("[+] sleep now")
# print 'sleep now'
time.sleep(config.CHECK_INTERVAL)
def test_gevent1(self):
"""????????????"""
def foo():
_log.info('Running in foo')
gevent.sleep(0)
_log.info('Explicit context switch to foo again')
def bar():
_log.info('Explicit context to bar')
gevent.sleep(0)
_log.info('Implicit context switch back to bar')
gevent.joinall([
gevent.spawn(foo),
gevent.spawn(bar),
])
def test_greenlet(self):
"""??????Greenlet????"""
class MyGreenlet(gevent.Greenlet):
def __init__(self, message, n):
super(MyGreenlet, self).__init__()
self.message = message
self.n = n
def _run(self):
print(self.message)
gevent.sleep(self.n)
g1 = MyGreenlet("Hi there111!", 1)
g1.start()
g2 = MyGreenlet("Hi there222!", 2)
g2.start()
gevent.joinall([g1, g2])
# def test_shutdown(self):
# def run_forever():
# _log.info('run_forever start..')
# gevent.sleep(1000)
# gevent.signal(signal.SIGQUIT, gevent.kill)
# thread = gevent.spawn(run_forever)
# thread.join()
def test_event(self):
"""????event???????????"""
evt = Event()
def setter():
'''After 3 seconds, wake all threads waiting on the value of evt'''
_log.info('A: Hey wait for me, I have to do something')
gevent.sleep(3)
_log.info("Ok, I'm done")
evt.set()
def waiter():
'''After 3 seconds the get call will unblock'''
_log.info("I'll wait for you")
evt.wait() # blocking
_log.info("It's about time")
gevent.joinall([
gevent.spawn(setter),
gevent.spawn(waiter),
gevent.spawn(waiter),
gevent.spawn(waiter),
gevent.spawn(waiter),
gevent.spawn(waiter)
])
def test_queue(self):
"""???????????Queue"""
task_queue = Queue()
def worker(name):
while not task_queue.empty():
task = task_queue.get()
_log.info('Worker %s got task %s' % (name, task))
gevent.sleep(0)
_log.info('Quitting time!')
def boss():
for i in xrange(1,25):
task_queue.put_nowait(i)
gevent.spawn(boss).join()
gevent.joinall([
gevent.spawn(worker, 'steve'),
gevent.spawn(worker, 'john'),
gevent.spawn(worker, 'nancy'),
])
def test_local(self):
"""
??????
?????gevent?web???HTTP?????????????????gevent?
"""
stash = local()
def f1():
stash.x = 1
print(stash.x)
def f2():
stash.y = 2
print(stash.y)
try:
stash.x
except AttributeError:
print("x is not local to f2")
g1 = gevent.spawn(f1)
g2 = gevent.spawn(f2)
gevent.joinall([g1, g2])
def synchronous():
# ??????
from gevent.event import Event
evt = Event()
def setter():
print('A: Hey wait for me, I have to do something')
gevent.sleep(3)
print('Ok, I\'m done')
evt.set()
def waiter():
print('I\'ll wait for you')
evt.wait()
print('It\'s about time')
gevent.joinall([gevent.spawn(setter),
gevent.spawn(waiter),
gevent.spawn(waiter),
gevent.spawn(waiter),
gevent.spawn(waiter)
])
def test_propagation_with_new_context(self):
# create multiple futures so that we expect multiple
# traces instead of a single one
ctx = Context(trace_id=100, span_id=101)
self.tracer.context_provider.activate(ctx)
def greenlet():
with self.tracer.trace('greenlet') as span:
gevent.sleep(0.01)
jobs = [gevent.spawn(greenlet) for x in range(1)]
gevent.joinall(jobs)
traces = self.tracer.writer.pop_traces()
eq_(1, len(traces))
eq_(1, len(traces[0]))
eq_(traces[0][0].trace_id, 100)
eq_(traces[0][0].parent_id, 101)
def handle(self, source, address):
init_data = source.recv(BUFFER_SIZE)
try:
if len(init_data) > 3 and init_data[:3] == b'GET':
source.sendall(b'HTTP/1.1 200 OK\r\n' + format_date_time(time.time()).encode() + b'\r\n\r\nOK')
return
else:
dest = create_connection(self.tcp_service)
except IOError as ex:
sys.stderr.write('Error on create connection: {}'.format(ex))
return
forwarders = (
gevent.spawn(forward, source, dest, self),
gevent.spawn(forward, dest, source, self),
)
gevent.joinall(forwarders)
def handle(self, *args, **options):
self.lobby = options['lobby']
self.password = options['password']
bots_num = 9
bot_login = os.environ.get('BOT_LOGIN', '')
bot_password = os.environ.get('BOT_PASSWORD', '')
credentials = [
{
'login': '%s%d' % (bot_login, i),
'password': '%s%d' % (bot_password, i),
} for i in xrange(2, bots_num+2)
]
try:
gevent.joinall([
gevent.spawn(self.start_bot, c) for c in credentials
])
finally:
for bot in self.bots:
bot.exit()
bot.steam.logout()
def handle(self, *args, **options):
bots_num = options['number']
bot_login = os.environ.get('BOT_LOGIN', '')
bot_password = os.environ.get('BOT_PASSWORD', '')
credentials = [
{
'login': '%s%d' % (bot_login, i),
'password': '%s%d' % (bot_password, i),
} for i in xrange(1, bots_num+1)
]
try:
gevent.joinall([
gevent.spawn(self.start_bot, c) for c in credentials
])
finally:
for bot in self.bots:
bot.exit()
bot.steam.logout()
def __init__(self, game_id, boxscore=None, playbyplays=None):
self.game_id = game_id
self._boxscore = boxscore
self._playbyplay = playbyplays
global _async_fetch
if not self._boxscore or not self._playbyplay:
api = Api()
if not _async_fetch:
self._boxscore = api.GetBoxscore(game_id)
self._playbyplay = api.GetPlayByPlay(game_id)
self._boxscore_summary = api.GetBoxscoreSummary(game_id)
else:
box_job = gevent.spawn(api.GetBoxscore, game_id)
pbp_job = gevent.spawn(api.GetPlayByPlay, game_id)
bs_job = gevent.spawn(api.GetBoxscoreSummary, game_id)
gevent.joinall([box_job, pbp_job, bs_job])
self._boxscore = box_job.value
self._playbyplay = pbp_job.value
self._boxscore_summary = bs_job.value
self._matchups = None
def map(requests, prefetch=True, size=None):
"""Concurrently converts a list of Requests to Responses.
:param requests: a collection of Request objects.
:param prefetch: If False, the content will not be downloaded immediately.
:param size: Specifies the number of requests to make at a time. If None, no throttling occurs.
"""
requests = list(requests)
pool = Pool(size) if size else None
jobs = [send(r, pool) for r in requests]
gevent.joinall(jobs)
if prefetch:
[r.response.content for r in requests]
return [r.response for r in requests]
def get_pic(url, mm_type):
response = my_get(url)
i = 0
while "400" in bs(response.content, "lxml").title or response.status_code == 404 or response.status_code == 400:
i += 1
if i > 5:
return
time.sleep(0.8)
response = my_get(url)
li_soup = bs(response.content, "lxml")
title = li_soup.title.text.replace(' ', '-')
if li_soup.find(lambda tag: tag.name == 'a' and '???»' in tag.text) is None:
with open("log.txt", "a") as fs:
fs.write(url + "\r\n")
fs.write(str(response.status_code) + "\r\n")
fs.write(response.content + "\r\n")
print "error" + url
else:
total_page = int(li_soup.find(lambda tag: tag.name == 'a' and '???»' in tag.text) \
.find_previous_sibling().text)
tasks = [gevent.spawn(download_pic, url + "/" + str(page), title, mm_type, ) for page in
range(1, total_page + 1)]
gevent.joinall(tasks)
def get_weibo_users_timeline_async(self, id_str):
def get_timeline_data(api_account):
while not tasks.empty():
client = WeiboAPIService(appKey=api_account[1], appSecret=api_account[2], token=api_account[3])
id = tasks.get_nowait()
data.put_nowait(client.get_weibo_user_timeline(id))
result_data = []
data = Queue()
tasks = Queue()
for id in id_str.split(",")[0:10]:
tasks.put_nowait(id)
# ?????api??
if self.api_accounts == None:
self.api_accounts = self.weiboDAO.get_weibo_accounts()
threads = []
for account in self.api_accounts:
threads.append(gevent.spawn(get_timeline_data,account))
gevent.joinall(threads)
while not data.empty():
result_data.append(data.get_nowait())
return result_data
def mutiSearchPlace(self):
envelope=polygon_target.envelope
bounds=list(envelope.bounds)
# ????
bounds[0] -= 0.02
parts = 50
# ?????4?????16???
boundsList = GeoUtil().getBoundsList(bounds, parts)
threads = []
# ???????????16?????????????
for index in range(0, len(boundsList)/16+1, 1):
for threadIndex in range(index*16,(index+1)*16):
if threadIndex < len(boundsList):
print 'current bounds ...%s ' % threadIndex
subBounds = boundsList[threadIndex]
# ?extent???polygon
coords=GeoUtil().getPolygonByExtent(subBounds)
coords=tuple(coords)
isIntersects=Polygon((coords)).intersects(polygon_target)
if isIntersects:
threads.append(gevent.spawn(self.fetchPlaceDetail, threadIndex%16, subBounds))
gevent.joinall(threads)
def sync(value):
hifen = "-"*20
while count1 < value:
diffSyncandAsync(value,"sync")
print(hifen.center())
thread = [ gevent.spawn(diffSyncandAsync,count2,"async") for count2 in range(value)]
gevent.joinall(thread)
print(hifen.center(40))
threadlst = []
for count2 in range(value):
threadlst.append(count2)
for T in threadlst:
realthread = threading.Thread(target=diffSyncandAsync,args=[T,"async"])
realthread.start()
def multi_thread(func):
@wraps(func)
def wrapper(*args, **kwargs):
url,start,end = args
jobs= []
piece = (end-start)//THREAD_NUM
for _ in range(THREAD_NUM):
jobs.append(gevent.spawn(func,url,start,start+piece))
start += piece
gevent.joinall(jobs)
# get return value
# result = []
# for j in jobs:
# result.append(j.value)
# return result
return wrapper
def run(self):
if self.install_path == 'all':
info_list = self.pkgCom.getPkgList()
else:
pkg_info = self.pkgCom.getPkgId(self.install_path)
if pkg_info:
info_list = {'pkg': [pkg_info]}
else:
return 1, "%s not a valid package" %(self.install_path)
t_list = []
#???????pkg??
if 'pkg' in info_list:
pkg_info_list = info_list['pkg']
else:
pkg_info_list = []
for pkg_info in pkg_info_list:
t_list.append(gevent.spawn(self.clear_pkg, pkg_info))
gevent.joinall(t_list)
return 0, "ok"
def run(self):
if self.install_path == 'all':
info_list = self.pkgCom.getPkgList()
else:
pkg_info = self.pkgCom.getPkgId(self.install_path)
if pkg_info:
info_list = {'pkg': [pkg_info]}
else:
return 1, "%s not a valid package" %(self.install_path)
t_list = []
#???????pkg??
if 'pkg' in info_list:
pkg_info_list = info_list['pkg']
else:
pkg_info_list = []
for pkg_info in pkg_info_list:
t_list.append(gevent.spawn(self.monitorPkg, pkg_info))
gevent.joinall(t_list)
return 0, "ok"
def handle_customize(self):
self.generate_uuid()
# self.inner_ip = self.getLocalIp()
# if not self.inner_ip:
# logger.error('not found local_ip, please restart agent')
# sys.exit(1)
server_groups = self.conf.get('report', 'server_groups')
job_list = []
job_list.append(gevent.spawn(self.localReport))
job_list.append(gevent.spawn(self.localJsonReport))
jobs = self.send_to_server_groups(server_groups, self.config["linger_ms"], self.config["max_queued_messages"])
job_list.extend(jobs)
gevent.joinall(job_list)
def store_worker():
"""
??????????????????????
"""
while True:
all_proxies = persister.list(count='all', columns='all')
spawns = list()
for proxy in all_proxies:
if proxy['protocol'] == 'http':
spawns.append(gevent.spawn(availability.check.store_handle, 'http', proxy, persister))
else:
spawns.append(gevent.spawn(availability.check.store_handle, 'https', proxy, persister))
if len(spawns) == config.COROUTINE_NUM:
gevent.joinall(spawns)
spawns.clear()
gevent.joinall(spawns)
spawns.clear()
time.sleep(config.PROXY_STORE_CHECK_SEC)
def deploy_bidders(bidder_addrs, web3, auction, kwargs):
if auction.call().stage() != AUCTION_STARTED:
log.warning('requested bidders deployment, but auction is not started yet')
return
from deploy.bidder import Bidder
bidder_objs = []
for addr in bidder_addrs:
bidder = Bidder(web3, auction, addr)
bidder.max_bid_ceiling = kwargs['max_bid_ceiling']
bidder.bid_interval = kwargs['bid_interval']
bidder.max_bid_price = kwargs['max_bid_amount']
bidder.min_bid_price = kwargs['min_bid_amount']
bidder_objs.append(bidder)
for i in range(0, kwargs['wei_bidders']):
if i == 0:
bidder_objs[i].max_bids = 1
bidder_objs[i].max_bid_price = 1
bidder_objs[i].min_bid_price = 1
bidder_gevents = [gevent.spawn(b.run) for b in bidder_objs]
gevent.joinall(bidder_gevents)
def init_database():
print("--- Start getting listings ---")
listings_threads = [gevent.spawn(l.insert_listings, sample, DB, networking_pool)
for sample in SAMPLES]
gevent.joinall(listings_threads)
print("--- Get all listings: %s seconds ---"
% (time.time() - start_time))
listings_cursor = DB.listings.find({"reviews_count": {"$gt": 0}})
listings = [listing for listing in listings_cursor]
print("--- listings length %s ---" % str(len(listings)))
print("--- reviews length %s ---" % str())
print("--- Start getting reviews ---")
reviews_threads = [networking_pool.spawn(r.insert_reviews, listing['_id'], DB)
for listing in listings]
networking_pool.join()
print("--- Get all reviews: %s seconds ---"
% (time.time() - start_time))
def cli(log_level, live):
logging.basicConfig(
filename='arbloop.log',
format='[%(asctime)s] [%(levelname)s] %(message)s',
level=log_level
)
logging.info('Warming up traders ...')
gevent.signal(signal.SIGQUIT, gevent.kill)
workers = []
for product in config.TRADER_PRODUCTS or []:
trader = Trader(product=product, live=live)
workers.append(
gevent.spawn(trader.trade)
)
gevent.joinall(workers)
def check_proxy(self):
"""return host is valid or not
"""
if not self.check_httpbin():
return
threads = []
self._before_check()
for index, url in enumerate(self.url_list):
threads.append(gevent.spawn(self._check, index, url))
gevent.joinall(threads)
self._after_check()