def run(self, seedfile, progress_queue, output_queue):
task_total = count_file_linenum(seedfile)
proc_name = current_process().name
sys.stdout = ProcessIO(output_queue)
def progress_tracking(greenlet):
count = getattr(progress_tracking, 'count', 0) + 1
setattr(progress_tracking, 'count', count)
progress_queue.put((proc_name, count, task_total))
return greenlet
po = pool.Pool(self.pool_size)
with open(seedfile) as f:
for line in f:
g = po.apply_async(func=self.pool_task_with_timeout,
args=(line, ),
kwds=None,
callback=self.callback)
g.link(progress_tracking)
po.add(g)
try:
po.join()
except (KeyboardInterrupt, SystemExit) as ex:
print(str(ex))
po.kill()
python类Pool()的实例源码
def gevent(app, address, **options):
options = options['options']
workers = options.workers
from gevent import pywsgi
from gevent.pool import Pool
pywsgi.WSGIServer(address, app, spawn=workers and Pool(
int(options.workers)) or 'default', log=None).serve_forever()
def __init__(self, listener, locals=None, banner=None, **server_args):
"""
:keyword locals: If given, a dictionary of "builtin" values that will be available
at the top-level.
:keyword banner: If geven, a string that will be printed to each connecting user.
"""
group = Pool(greenlet_class=_Greenlet_stdreplace) # no limit on number
StreamServer.__init__(self, listener, spawn=group, **server_args)
_locals = {'__doc__': None, '__name__': '__console__'}
if locals:
_locals.update(locals)
self.locals = _locals
self.banner = banner
self.stderr = sys.stderr
def use_gevent_with_queue():
queue = Queue()
pool = Pool(5)
for p in range(1, 7):
queue.put(p)
while pool.free_count():
sleep(0.1)
pool.spawn(save_search_result_with_queue, queue)
pool.join()
def use_gevent_with_queue():
queue = Queue()
pool = Pool(5)
for p in range(1, 7):
put_new_page(p, queue)
while pool.free_count():
sleep(0.1)
pool.spawn(save_search_result_with_queue, queue)
pool.join()
def gevent(app, address, **options):
options = options['options']
workers = options.workers
from gevent import pywsgi
from gevent.pool import Pool
pywsgi.WSGIServer(address, app, spawn=workers and Pool(
int(options.workers)) or 'default', log=None).serve_forever()
def serve_forever(listener):
WSGIServer(listener, application, spawn=Pool(), log=None).serve_forever()
def __init__(self, queue, db_proxy_num,myip):
self.crawl_pool = Pool(THREADNUM)
self.queue = queue
self.db_proxy_num = db_proxy_num
self.myip = myip
def __init__(self,options):
self.options = options
self.blockip = None
self.keywords = False
self.pool = Pool(self.options['threads_count'])
self.document = self.options['target'].replace('.','_')+'.txt'
socket.setdefaulttimeout(self.options['timeout'])
# ????
def bulk_originate(self, request_uuid_list):
if request_uuid_list:
self.log.info("BulkCall for RequestUUIDs %s" % str(request_uuid_list))
job_pool = pool.Pool(len(request_uuid_list))
[ job_pool.spawn(self.spawn_originate, request_uuid)
for request_uuid in request_uuid_list ]
return True
self.log.error("BulkCall Failed -- No RequestUUID !")
return False
def __init__(self, worker_func, task_count, worker_count=-1):
# initialize completion task worker pool
# if number of workers is not specified, set it to the number of CPUs
if worker_count == -1:
worker_count = cpu_count()
self.worker_pool = pool.Pool(size=worker_count)
self.worker_pool_closed = False
# store requested task count and callback function
self.task_count = task_count
self.worker_func = worker_func
def gevent(app, address, **options):
options = options['options']
workers = options.workers
from gevent import pywsgi
from gevent.pool import Pool
pywsgi.WSGIServer(address, app, spawn=workers and Pool(
int(options.workers)) or 'default', log=None).serve_forever()
def test_concurrency_with_delayed_url(self):
dh = DownloadHandler(self.spider, self.driver, self.driver_sem)
n = 5
pool = Pool(n)
urls = []
for i in range(n):
urls.append(HTTPBIN_URL + '/delay/1')
time_start = time.time()
pool.map(dh.fetch, [Request(url) for url in urls])
time_total = time.time() - time_start
self.assertLess(time_total, n)
def test_dynamic_request_concurrency(self):
self.driver = webdriver.PhantomJS()
dh = DownloadHandler(self.spider, self.driver, self.driver_sem)
n = 5
pool = Pool(n)
urls = []
for i in range(n):
urls.append(HTTPBIN_URL + '/delay/1')
time1 = time.time()
pool.map(dh.fetch, [Request(url, dynamic=True, wait=5) for url in urls])
self.assertGreater(time.time() - time1, n)
self.driver.close()
def run(self, proxylist):
if len(proxylist) == 0:
return []
pool = Pool(VALIDATE_CONFIG['THREAD_NUM'])
self.result = filter(lambda x: x, pool.map(self.valid, proxylist))
return self.result
def run_task_in_gevent(url_list, poc_file_dict): # url_list ???????????url
poc = Poc_Launcher()
pool = Pool(100)
for target in url_list:
for plugin_type, poc_files in poc_file_dict.iteritems():
for poc_file in poc_files:
if target and poc_file:
target = fix_target(target)
pool.add(gevent.spawn(poc.poc_verify, target, plugin_type, poc_file))
pool.join()
def gevent(app, address, **options):
options = options['options']
workers = options.workers
from gevent import pywsgi
from gevent.pool import Pool
pywsgi.WSGIServer(address, app, spawn=workers and Pool(
int(options.workers)) or 'default', log=None).serve_forever()
def new_parallel(self, function, *params):
'''
Register a new thread executing a parallel method.
'''
# Create a pool if not created (processes or Gevent...)
if self.ppool is None:
if core_type == 'thread':
from multiprocessing.pool import ThreadPool
self.ppool = ThreadPool(500)
else:
from gevent.pool import Pool
self.ppool = Pool(500)
# Add the new task to the pool
self.ppool.apply_async(function, *params)
def run_bugscan(url_list):
from tools.pocs.bugscan import Bugscan
PLUGINS_DIR = 'D:\\Projects\\xlcscan\\tools\\pocs\\'
poc = Bugscan()
pool = Pool(100)
for target in url_list:
for poc_file in bugscan_name_list:
if target and poc_file:
target = fix_target(target)
poc_file = PLUGINS_DIR + 'bugscan' + '\\' + poc_file
pool.add(gevent.spawn(poc.run, target,poc_file))
pool.join()
def run_task_in_gevent(url_list, poc_file_dict): # url_list ???????????url
poc = Poc_Launcher()
pool = Pool(100)
for target in url_list:
for plugin_type, poc_files in poc_file_dict.iteritems():
for poc_file in poc_files:
if target and poc_file:
target = fix_target(target)
pool.add(gevent.spawn(poc.poc_verify, target, plugin_type, poc_file))
pool.join()