def execute(self, context, start_req, parser=_default_parser,
pool=None, pool_size=None):
"""
:param context: ?????
:param start_req: ??????
:param parser: Response?????
:param concurrent: ??????????
:param pool: ?????gevent pool
"""
if pool or pool_size:
# ????
return self._concurrent_execute(
context, start_req, parser,
pool, pool_size)
else:
# ????
return self._sync_execute(context, start_req, parser)
python类pool()的实例源码
def _concurrent_execute(self, context, start_req, parser, pool, pool_size):
queue = Queue() # ????
# ????????????
for r in start_req:
queue.put_nowait(r)
if pool is None:
pool = GeventPool(pool_size)
greenlets = []
while True:
try:
req = self._check_req(queue.get(timeout=1))
if req.parser is None:
req.parser = parser
greenlets.append(pool.spawn(req, context, queue))
except Empty:
break
return [greenlet.get() for greenlet in greenlets]
def test_pool(self):
"""?????"""
class SocketPool(object):
def __init__(self):
self.pool = Pool(1000)
self.pool.start()
def listen(self, socket):
while True:
socket.recv()
def add_handler(self, socket):
if self.pool.full():
raise Exception("At maximum pool size")
else:
self.pool.spawn(self.listen, socket)
def shutdown(self):
self.pool.kill()
def map(requests, prefetch=True, size=None):
"""Concurrently converts a list of Requests to Responses.
:param requests: a collection of Request objects.
:param prefetch: If False, the content will not be downloaded immediately.
:param size: Specifies the number of requests to make at a time. If None, no throttling occurs.
"""
requests = list(requests)
pool = Pool(size) if size else None
jobs = [send(r, pool) for r in requests]
gevent.joinall(jobs)
if prefetch:
[r.response.content for r in requests]
return [r.response for r in requests]
def run_application(self):
upgrade_header = self.environ.get('HTTP_UPGRADE', '').lower()
if upgrade_header:
# Build and start the HTTP response
self.environ['ws4py.socket'] = self.socket or self.environ['wsgi.input'].rfile._sock
self.result = self.application(self.environ, self.start_response) or []
self.process_result()
del self.environ['ws4py.socket']
self.socket = None
self.rfile.close()
ws = self.environ.pop('ws4py.websocket', None)
if ws:
ws_greenlet = self.server.pool.track(ws)
# issue #170
# in gevent 1.1 socket will be closed once application returns
# so let's wait for websocket handler to finish
ws_greenlet.join()
else:
gevent.pywsgi.WSGIHandler.run_application(self)
def run(self):
# ??????????
# ??????????
findreport(self.document)
# ???????????
# ?????????????????
# ????????????block???
block_check_results = self.checkdomain('d312379196bd822558ca7dfb3c95ba61.'+self.options['target'],'block')
if block_check_results:
self.blockip = block_check_results[0]
# ????
dic_list = (dic.strip('\n')+'.'+self.options['target'] for dic in open(getpath() + '/' +self.options['dictname'],'r'))
# ??????
self.pool.map(self.checkdomain,dic_list)
def loop(self):
self.listen()
self.connect()
self.logger.info('listening for jobs on %s', self.address)
while self.should_run():
if self.pool.free_count() == 0:
self.logger.info('waiting for an execution slot')
self.pool.wait_available()
job = self.sockets.recv_safe('pull-in')
if job:
self.logger.info('received job')
self.pool.spawn(self.dispatch, job)
else:
self.notify_available()
gevent.sleep(1)
def _load_dns_servers(self):
print '[+] Validate DNS servers ...'
self.dns_servers = []
pool = Pool(30)
for server in open('dict/dns_servers.txt').xreadlines():
server = server.strip()
if server:
pool.apply_async(self._test_server, (server,))
pool.join()
self.dns_count = len(self.dns_servers)
sys.stdout.write('\n')
print '[+] Found %s available DNS Servers in total' % self.dns_count
if self.dns_count == 0:
print '[ERROR] No DNS Servers available.'
sys.exit(-1)
def run(self):
if self.beanstalk:
generator = self.beanstalk.get_workgenerator(self)
else:
generator = ListWorkGenerator(self)
pool = gevent.pool.Pool(self.options.concurrency)
self.finished = 0
if self.progress:
self.progress.start(generator.total)
try:
for worker in generator.getall():
pool.add(gevent.spawn(worker.run))
except KeyboardInterrupt:
print("Ctrl+C caught... stopping")
pool.join()
if self.progress:
self.progress.finish()
def run(args):
if args.download:
resolvers = download_resolvers()
else:
resolvers = load_resolvers(args.resolvers)
random.shuffle(resolvers)
pool = gevent.pool.Pool(args.concurrency)
bar = progressbar.ProgressBar(redirect_stdout=True, redirect_stderr=True)
for resolver in bar(resolvers):
pool.add(gevent.spawn(check_resolver, args, resolver))
pool.join()
def __init__(self):
self.pool_size = None
self.pool = Pool(self.pool_size)
self.session = requests.Session()
self.timeout = 10
self.url = None
self.response = None
def send(self,hibp_obj):
'''
Spawns gevent/pool threads that will run the execute method on each
HIBP object.
Attributes:
- hibp_obj -> HIBP object
'''
if self.pool is not None:
return self.pool.spawn(hibp_obj.execute)
return gevent.spawn(hibp_obj.execute)
def imap(self,hibp_objs):
'''
Lazily + Asynchronously map the HIBP execution job to multiple queries.
Attributes:
- hibp_objs - list of HIBP objects
'''
for hibp_obj in self.pool.imap_unordered(HIBP.execute, hibp_objs):
yield hibp_obj.response
self.pool.join()
def __init__(self):
self.pool_size = None
self.pool = Pool(self.pool_size)
self.session = requests.Session()
self.timeout = 10
self.url = None
self.response = None
def send(self,hibp_obj):
'''
Spawns gevent/pool threads that will run the execute method on each
HIBP object.
Attributes:
- hibp_obj -> HIBP object
'''
if self.pool is not None:
return self.pool.spawn(hibp_obj.execute)
return gevent.spawn(hibp_obj.execute)
def __init__(self):
self.pool_size = None
self.pool = Pool(self.pool_size)
self.session = requests.Session()
self.timeout = 10
self.url = None
self.response = None
def send(self,hibp_obj):
'''
Spawns gevent/pool threads that will run the execute method on each
HIBP object.
Attributes:
- hibp_obj -> HIBP object
'''
if self.pool is not None:
return self.pool.spawn(hibp_obj.execute)
return gevent.spawn(hibp_obj.execute)
def imap(self,hibp_objs):
'''
Lazily + Asynchronously map the HIBP execution job to multiple queries.
Attributes:
- hibp_objs - list of HIBP objects
'''
for hibp_obj in self.pool.imap_unordered(HIBP.execute, hibp_objs):
yield hibp_obj.response
self.pool.join()
def run(self):
logger.info("starting task daemon...")
pool = gevent.pool.Pool(self.pool_size)
for i in range(self.pool_size):
pool.apply_async(self.consumer)
p = gevent.spawn(self.producer)
p.join()
def gevent_click_page():
global TRY_COUNT
TRY_COUNT = int(sys.argv[1])
_log.info('????????...')
# ????????
driver = webdriver.PhantomJS()
driver.get('https://www.xncoding.com/archives/')
# driver.maximize_window()
posts_count = len(driver.find_elements_by_xpath(
'//article/header/h1[@class="post-title"]/a[@class="post-title-link"]'))
driver.close()
# gevent?pool??
psize = posts_count / THREAD_COUNT
_log.info('???????:{}, ??????????:{}'.format(posts_count, psize))
group = Group()
for i in range(0, THREAD_COUNT + 1):
group.add(gevent.spawn(_click_page, posts_count, psize, i))
group.join()
_log.info('????...')
def boss():
# headers = {'User-Agent': 'Mozilla/4.0 (compatible; MSIE 5.5; Windows NT)'}
def _(url):
# response = requests.get(url, headers=headers)
response = requests.get(url)
# sleep(300 / 1000)
print(url)
tasks.put_nowait(url)
global nums
page_url_base = 'http://www.mala.cn/forum-70-{0}.html'
page_urls = [page_url_base.format(i) for i in range(1, 100)]
nums = len(page_urls)
# [pool.apply_async(_, args=(obj,)) for obj in page_urls]
[pool.apply(_, args=(obj,)) for obj in page_urls]
# pool.map_async(_, page_urls)
isi_data_insights_daemon.py 文件源码
项目:isilon_data_insights_connector
作者: Isilon
项目源码
文件源码
阅读 29
收藏 0
点赞 0
评论 0
def __init__(self, pidfile):
"""
Initialize.
:param: pidfile is the path to the daemon's pidfile (required).
"""
super(IsiDataInsightsDaemon, self).__init__(pidfile=pidfile)
self._stat_sets = {}
self._update_intervals = []
self._stats_processor = None
self._stats_processor_args = None
self._process_stats_func = None
self.async_worker_pool = gevent.pool.Pool(MAX_ASYNC_QUERIES)
def __init__(self, func, pool_size=100, timeout=None):
# XXX: Is it necessary to patch all? I know we need at least, socket,
# ssl, dns, and signal. There may be calls inside boto/botocore that
# require more patching.
super(GeventWorker, self).__init__(func, pool_size=pool_size,
timeout=timeout)
self.logger = get_logger(__name__)
self.pool = Pool(size=pool_size)
def process_messages(self, messages):
greenlet_to_message = {}
processed = []
self.logger.debug('processesing %d messages', len(messages))
for message in messages:
try:
g = self.pool.spawn(self.func, message)
except:
self.logger.exception('cannot submit jobs to pool')
raise
greenlet_to_message[g] = message
for g in gevent.iwait(greenlet_to_message):
message = greenlet_to_message.pop(g)
try:
if g.exception:
raise g.exception
except:
self.logger.exception('exception processing message %s',
message.message_id)
else:
processed.append(message)
return processed
def shutdown(self):
self.pool.join()
geventserver.py 文件源码
项目:arduino-ciao-meteor-ddp-connector
作者: andrea689
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def run_application(self):
upgrade_header = self.environ.get('HTTP_UPGRADE', '').lower()
if upgrade_header:
try:
# Build and start the HTTP response
self.environ['ws4py.socket'] = self.socket or self.environ['wsgi.input'].rfile._sock
self.result = self.application(self.environ, self.start_response) or []
self.process_result()
except:
raise
else:
del self.environ['ws4py.socket']
self.socket = None
self.rfile.close()
ws = self.environ.pop('ws4py.websocket')
if ws:
self.server.pool.track(ws)
else:
gevent.pywsgi.WSGIHandler.run_application(self)
geventserver.py 文件源码
项目:arduino-ciao-meteor-ddp-connector
作者: andrea689
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def __init__(self, *args, **kwargs):
"""
WSGI server that simply tracks websockets
and send them a proper closing handshake
when the server terminates.
Other than that, the server is the same
as its :class:`gevent.pywsgi.WSGIServer`
base.
"""
_WSGIServer.__init__(self, *args, **kwargs)
self.pool = GEventWebSocketPool()
geventserver.py 文件源码
项目:arduino-ciao-meteor-ddp-connector
作者: andrea689
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def stop(self, *args, **kwargs):
self.pool.clear()
_WSGIServer.stop(self, *args, **kwargs)
def send(r, pool=None):
"""Sends the request object using the specified pool. If a pool isn't
specified this method blocks. Pools are useful because you can specify size
and can hence limit concurrency."""
if pool != None:
return pool.spawn(r.send)
return gevent.spawn(r.send)
# Patched requests.api functions.
def __init__(self, *args, **kwargs):
"""
WSGI server that simply tracks websockets
and send them a proper closing handshake
when the server terminates.
Other than that, the server is the same
as its :class:`gevent.pywsgi.WSGIServer`
base.
"""
_WSGIServer.__init__(self, *args, **kwargs)
self.pool = GEventWebSocketPool()