def stop(self, *args, **kwargs):
self.pool.clear()
_WSGIServer.stop(self, *args, **kwargs)
python类pool()的实例源码
def __init__(self,options):
self.options = options
self.blockip = None
self.keywords = False
self.pool = Pool(self.options['threads_count'])
self.document = self.options['target'].replace('.','_')+'.txt'
socket.setdefaulttimeout(self.options['timeout'])
# ????
def _protocol_send(self, command, args=""):
if self._closing_state:
return Event()
self.trace("_protocol_send %s %s" % (command, args))
# Append command to pool
# and send it to eventsocket
_cmd_uuid = str(uuid1())
_async_res = gevent.event.AsyncResult()
with self._lock:
self._commands_pool.append((_cmd_uuid, _async_res))
self._send("%s %s" % (command, args))
self.trace("_protocol_send %s wait ..." % command)
_uuid, event = _async_res.get()
if _cmd_uuid != _uuid:
raise InternalSyncError("in _protocol_send")
# Casts Event to appropriate event type :
# Casts to ApiResponse, if event is api
if command == 'api':
event = ApiResponse.cast(event)
# Casts to BgapiResponse, if event is bgapi
elif command == "bgapi":
event = BgapiResponse.cast(event)
# Casts to CommandResponse by default
else:
event = CommandResponse.cast(event)
self.trace("_protocol_send %s done" % command)
return event
def test_concurrency_with_delayed_url(self):
dh = DownloadHandler(self.spider, self.driver, self.driver_sem)
n = 5
pool = Pool(n)
urls = []
for i in range(n):
urls.append(HTTPBIN_URL + '/delay/1')
time_start = time.time()
pool.map(dh.fetch, [Request(url) for url in urls])
time_total = time.time() - time_start
self.assertLess(time_total, n)
def test_dynamic_request_concurrency(self):
self.driver = webdriver.PhantomJS()
dh = DownloadHandler(self.spider, self.driver, self.driver_sem)
n = 5
pool = Pool(n)
urls = []
for i in range(n):
urls.append(HTTPBIN_URL + '/delay/1')
time1 = time.time()
pool.map(dh.fetch, [Request(url, dynamic=True, wait=5) for url in urls])
self.assertGreater(time.time() - time1, n)
self.driver.close()
def run_task_in_gevent(url_list, poc_file_dict): # url_list ???????????url
poc = Poc_Launcher()
pool = Pool(100)
for target in url_list:
for plugin_type, poc_files in poc_file_dict.iteritems():
for poc_file in poc_files:
if target and poc_file:
target = fix_target(target)
pool.add(gevent.spawn(poc.poc_verify, target, plugin_type, poc_file))
pool.join()
def benchmark():
gevent.spawn(printstats)
for _ in xrange(1000):
pool.spawn(bench, itemid)
pool.join()
def main():
import argparse
parser = argparse.ArgumentParser(description='Listing ids of a (leaf) category')
parser.add_argument('--cid', '-c', type=int, help='taobao cid, e.g. 51106012', required=True)
parser.add_argument('--pool', '-p', action='store_true', help='use gevent pool to boost execution')
parser.add_argument('--num_paths', '-n', type=int, default=2, help='number of paths, default to 2')
parser.add_argument('--max_page', '-m', type=int, default=1, help='max page, default to 1')
option = parser.parse_args()
print('total items: {}'.format(len(test_list(option.cid, option.pool, option.num_paths, option.max_page))))
def __init__(self, poolsize=5):
self.pool = gevent.pool.Pool(poolsize)
def main():
import argparse
parser = argparse.ArgumentParser(description='Call Worker with arguments')
parser.add_argument('--worker', '-w', choices=['aggregate'], help='worker type, can be "aggregate"', required=True)
parser.add_argument('--poolsize', '-p', type=int, default=5, help='gevent pool size for worker (default: %(default)s)')
option = parser.parse_args()
if option.worker == "aggregate":
AggregateWorker(option.poolsize).work()
def run_bugscan(url_list):
from tools.pocs.bugscan import Bugscan
PLUGINS_DIR = 'D:\\Projects\\xlcscan\\tools\\pocs\\'
poc = Bugscan()
pool = Pool(100)
for target in url_list:
for poc_file in bugscan_name_list:
if target and poc_file:
target = fix_target(target)
poc_file = PLUGINS_DIR + 'bugscan' + '\\' + poc_file
pool.add(gevent.spawn(poc.run, target,poc_file))
pool.join()
def run_task_in_gevent(url_list, poc_file_dict): # url_list ???????????url
poc = Poc_Launcher()
pool = Pool(100)
for target in url_list:
for plugin_type, poc_files in poc_file_dict.iteritems():
for poc_file in poc_files:
if target and poc_file:
target = fix_target(target)
pool.add(gevent.spawn(poc.poc_verify, target, plugin_type, poc_file))
pool.join()
def run_task_in_gevent(url_list, poc_file_dict):
poc = Poc_Launcher()
pool = Pool(100)
for target in url_list:
for poc_file in poc_file_dict:
if target and poc_file:
try:
target = fix_domain(target)
except Exception as e:
target = fix_host(target)
#print target,poc_file,"^^^^^^^^"
pool.add(gevent.spawn(poc.poc_verify, target, poc_file))
pool.join()
def __init__( self, cloudDest, cbReceiveMessage, orgId, installerId, platform, architecture,
sensorId = None, enrollmentToken = None,
cbDebugLog = None, cbEnrollment = None ):
gevent.Greenlet.__init__( self )
self._cbDebugLog = cbDebugLog
self._cbReceiveMessage = cbReceiveMessage
self._cbEnrollment = cbEnrollment
try:
self._destServer, self._destPort = cloudDest.split( ':' )
except:
self._destServer = cloudDest
self._destPort = 443
self._oid = uuid.UUID( str( orgId ) )
self._iid = uuid.UUID( str( installerId ) )
self._sid = sensorId
self._arch = architecture
self._plat = platform
if self._sid is not None:
self._sid = uuid.UUID( str( self._sid ) )
self._enrollmentToken = enrollmentToken
self._socket = None
self._threads = gevent.pool.Group()
self._stopEvent = gevent.event.Event()
self._lock = Semaphore( 1 )
self._connectedEvent = gevent.event.Event()
self._r = rpcm( isHumanReadable = True, isDebug = self._log )
self._r.loadSymbols( Symbols.lookups )
self._hcpModules = []
self._hbsProfileHash = ( "\x00" * 32 )
def querySites( queryCat, queryAction, queryData = {}, siteProc = defaultSiteProc, qProc = defaultQueryProc ):
global sites
p = gevent.pool.Pool()
ctx = {}
siteResults = [ x for x in p.imap_unordered( lambda x: querySite( queryCat, queryAction, queryData, siteProc, x, ctx ), sites ) ]
return qProc( siteResults, ctx )
###############################################################################
# PAGES
###############################################################################
def __init__( self, maxQps, cbLog = None ):
self._maxQps = maxQps
self._q = gevent.queue.Queue()
self._log = cbLog
self._transmitted = 0
self._lastWait = time.time()
self._isRunning = True
self._threads = gevent.pool.Group()
self._threads.add( gevent.spawn_later( 0, self._sendLoop ) )
self._threads.add( gevent.spawn_later( 1, self._resetStats ) )
def __init__(self, name, steps=[]):
self.name = name
self.actions = Speaker(
'actions',
[
'available',
'failed',
'started',
'success',
'metric',
'error',
'logs',
]
)
self.steps = [s.job_type for s in steps]
self.total_steps = len(steps)
self.context = zmq.Context()
self.sockets = SocketManager(zmq, self.context)
self.sockets.create('step-events', zmq.SUB)
self.sockets.create('jobs-in', zmq.PULL)
for step in self.steps:
self.sockets.create(step, zmq.PUSH)
for action in self.actions.actions.keys():
self.bind_action(action)
self.total_actions = len(self.actions.actions)
self.pool = gevent.pool.Pool(self.total_actions ** (self.total_steps + 1))
self.greenlets = []
self._allowed_to_run = True
self.default_interval = 0.1
self.backend = StorageBackend()
self.logger = logging.getLogger('pipeline')
def spawn(self, *args, **kw):
self.greenlets.append(
self.pool.spawn(*args, **kw)
)
def __init__(self, pull_bind_address='tcp://127.0.0.1', subscriber_connect_address='tcp://127.0.0.1:6000', concurrency=100, timeout=1):
self.context = zmq.Context()
self.sockets = SocketManager(zmq, self.context)
self.sockets.create('pull-in', zmq.PULL)
# self.sockets.set_socket_option('pull-in', zmq.RCVHWM, concurrency)
self.sockets.create('events', zmq.PUB)
self.name = self.__class__.__name__
self.subscriber_connect_address = subscriber_connect_address
self._allowed_to_run = True
self.pool = gevent.pool.Pool(concurrency + 1)
self.timeout = timeout
self.pull_bind_address = pull_bind_address
self.id = str(uuid.uuid4())
self.logger = self.sockets.get_logger('events', 'logs', 'logs')
def run(self, bots, wait_seconds=None):
"""
Setup the bots and seconds to wait and spawn the required gevent
:param bots: [Crawler]
:param wait_seconds: seconds for checking the urls
"""
self.wait_seconds, self.bots = wait_seconds, bots
pool = Pool()
pool.spawn(self.posts_fetcher)
pool.spawn(self.telegram_posts_sender)
pool.join()