def map(self,hibp_objs):
'''
Asynchronously map the HIBP execution job to multiple queries.
Attributes:
- hibp_objs - list of HIBP objects
'''
jobs = [self.send(hibp_obj) for hibp_obj in hibp_objs]
gevent.joinall(jobs, timeout=self.timeout)
return hibp_objs
python类joinall()的实例源码
def map(self,hibp_objs):
'''
Asynchronously map the HIBP execution job to multiple queries.
Attributes:
- hibp_objs - list of HIBP objects
'''
jobs = [self.send(hibp_obj) for hibp_obj in hibp_objs]
gevent.joinall(jobs, timeout=self.timeout)
return hibp_objs
def map(self,hibp_objs):
'''
Asynchronously map the HIBP execution job to multiple queries.
Attributes:
- hibp_objs - list of HIBP objects
'''
jobs = [self.send(hibp_obj) for hibp_obj in hibp_objs]
gevent.joinall(jobs, timeout=self.timeout)
return hibp_objs
def tearDown(self):
contexts = set([self.context])
while self.sockets:
sock = self.sockets.pop()
contexts.add(sock.context) # in case additional contexts are created
sock.close()
try:
gevent.joinall([gevent.spawn(ctx.term) for ctx in contexts], timeout=2, raise_error=True)
except gevent.Timeout:
raise RuntimeError("context could not terminate, open sockets likely remain in test")
def web_ip_cache(self):
while True:
t1 = time.time()
try:
r = redis.StrictRedis(REDIS_SERVER,REDIS_PORT,DB_FOR_IP)
num,ips = self.db_set_select(r,REDIS_SET_CACHE,False,WEB_CACHE_IP_NUM)
self.cur_num = num
self.cur_pos = 0
self.len = num
#print ips
#print "cur num",self.cur_num,self.cur_pos,self.len
if num >0 and ips != None and len(ips) > 0 :
glist = [gevent.spawn(self.test_ip,r,ips,True) for i in range(GEVENT_NUM)]
gevent.joinall(glist)
times = 0
while self.cur_num < WEB_CACHE_IP_NUM and times < 1024:
#print "cur num",self.cur_num
n = (WEB_CACHE_IP_NUM - self.cur_num)*2
num,ips = self.db_set_select(r,REDIS_SORT_SET_COUNTS,True,n)
self.cur_pos = 0
self.len = num
times += 1
if num == 0 or ips == None:
continue
glist = [gevent.spawn(self.test_ip,r,ips,False) for i in range(GEVENT_NUM)]
gevent.joinall(glist)
#print "cur num end ",self.cur_num
except Exception as e:
#print e
log.error("PID:%d web cache error:%s" % (os.getpid(),e))
finally:
t2 = time.time()
#print "sleep"
t = WEB_CACHE_REFRESH - ( t2 - t1 )
if t > 0:
time.sleep(t)
def get_proxy(q,msg_queue):
bloom.clear()
times = 0
while True:
try:
num = db_zcount()
log.debug("PID:%d db current ips %d------" % (os.getpid(),num))
while num > MIN_NUM:
time.sleep(REFRESH_WEB_SITE_TIMEER)
times += 1
if times == REFRESH_BF:
bloom.clear()
times = 0
log.debug("PID:%d refresh bloom filter" % os.getpid())
msg_queue.put("OK")
t1 = time.time()
event = []
for key,value in URL_PATTERN.items():
event.append(gevent.spawn(worker,value,q))
gevent.joinall(event)
t2 = time.time()
t = REFRESH_WEB_SITE_TIMEER - (t2 - t1)
times += 1
if t > 0:
time.sleep(t)
log.debug("PID:%d proxy sleep end------" % os.getpid())
if times == REFRESH_BF:
bloom.clear()
times = 0
log.debug("PID:%d refresh bloom filter" % os.getpid())
except Exception as e:
log.error("PID:%d proxy error:%s" % os.getpid(),e.message())
def start(self):
"""Starts Cobalt only if the ETCD versions match with the defined one.
Returns:
bool: If the start operation succeded or not
"""
if not self._ensure_versions_match():
return False
routines = []
for _, service in self.services.items():
routines += service.start()
gevent.joinall(routines)
def testCall(self):
obj1 = ExampleClass()
obj2 = ExampleClass()
s = time.time()
assert RateLimit.call("counting", allowed_again=0.1, func=obj1.count) == "counted"
assert around(time.time() - s, 0.0) # First allow to call instantly
assert obj1.counted == 1
# Call again
assert not RateLimit.isAllowed("counting", 0.1)
assert RateLimit.isAllowed("something else", 0.1)
assert RateLimit.call("counting", allowed_again=0.1, func=obj1.count) == "counted"
assert around(time.time() - s, 0.1) # Delays second call within interval
assert obj1.counted == 2
# Call 3 times async
s = time.time()
assert obj2.counted == 0
threads = [
gevent.spawn(lambda: RateLimit.call("counting", allowed_again=0.1, func=obj2.count)), # Instant
gevent.spawn(lambda: RateLimit.call("counting", allowed_again=0.1, func=obj2.count)), # 0.1s delay
gevent.spawn(lambda: RateLimit.call("counting", allowed_again=0.1, func=obj2.count)) # 0.2s delay
]
gevent.joinall(threads)
assert [thread.value for thread in threads] == ["counted", "counted", "counted"]
assert around(time.time() - s, 0.2)
# No queue = instant again
s = time.time()
assert RateLimit.isAllowed("counting", 0.1)
assert RateLimit.call("counting", allowed_again=0.1, func=obj2.count) == "counted"
assert around(time.time() - s, 0.0)
assert obj2.counted == 4
def checkModifications(self, since=None):
peers_try = [] # Try these peers
queried = [] # Successfully queried from these peers
# Wait for peers
if not self.peers:
self.announce()
for wait in range(10):
time.sleep(5 + wait)
self.log.debug("Waiting for peers...")
if self.peers:
break
peers = self.peers.values()
random.shuffle(peers)
for peer in peers: # Try to find connected good peers, but we must have at least 5 peers
if peer.findConnection() and peer.connection.handshake.get("rev", 0) > 125: # Add to the beginning if rev125
peers_try.insert(0, peer)
elif len(peers_try) < 5: # Backup peers, add to end of the try list
peers_try.append(peer)
if since is None: # No since defined, download from last modification time-1day
since = self.settings.get("modified", 60 * 60 * 24) - 60 * 60 * 24
self.log.debug("Try to get listModifications from peers: %s since: %s" % (peers_try, since))
updaters = []
for i in range(3):
updaters.append(gevent.spawn(self.updater, peers_try, queried, since))
gevent.joinall(updaters, timeout=10) # Wait 10 sec to workers done query modifications
if not queried:
gevent.joinall(updaters, timeout=10) # Wait another 10 sec if none of updaters finished
time.sleep(0.1)
self.log.debug("Queried listModifications from: %s" % queried)
return queried
# Update content.json from peers and download changed files
# Return: None
def clientprocess():
t1 = time.time()
clients = [gevent.spawn(client) for _ in range(N_CLIENTS)]
gevent.joinall(clients)
duration = time.time()-t1
print("%s clients served within %.2f s." % (N_CLIENTS, duration))
def child_test_wsgi_scenario_client(server_address):
def get():
assert urllib2.urlopen("http://%s:%s/" % server_address).read() == MSG
t1 = time.time()
clientlets = [gevent.spawn(get) for _ in range(N)]
gevent.joinall(clientlets)
duration = time.time() - t1
print("%s clients were served within %.2f s." % (N, duration))
def test_concurrency(self):
client = FakePooledThriftClientMixin(host_provider=HostsProvider(HOSTS),
pool_size=5)
self.assertEqual(0, AnotherFakeClient.in_flight_calls)
AnotherFakeClient.num_calls = 0
greenlets = []
for i in xrange(0, 10):
greenlets.append(gevent.spawn(self._run_method_success,
client, 3))
gevent.joinall(greenlets)
self.assertEqual(30, AnotherFakeClient.num_calls)
geventSimple.py 文件源码
项目:Learning-Concurrency-in-Python
作者: PacktPublishing
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def main():
urls = ['www.google.com', 'www.example.com', 'www.python.org']
jobs = [gevent.spawn(socket.gethostbyname, url) for url in urls]
gevent.joinall(jobs, timeout=2)
print([job.value for job in jobs])
def run(self):
while(True):
logging.info("[+] Spider start runing")
# print '[+] ', 'Spider start runing'
spawns = []
# ??????ip???
# select * from ip_table
db = DBHelper()
ids = db.getIds()
# self.db_ip_num.value = len(ids)
# print selected_id
# exit()
logging.info("[+] db save ip: {0}".format(len(ids)))
# print '[+] ', 'db save ip:%d' % len(ids)
if len(ids) < config.MINNUM:
logging.info("[+] now ip num < MINNUM start spider")
# print '[+] ', 'now ip num < MINNUM start spider...'
for parser in config.parser_list:
if ids:
selected_id = random.choice(ids)
ip = db.getIp(selected_id)
else:
ip = ''
spawns.append(gevent.spawn(self.spider, parser, ip))
if len(spawns) >= config.MAX_DOWNLOAD_CONCURRENT:
gevent.joinall(spawns)
spawns = []
gevent.joinall(spawns)
else:
logging.info("[+] now ip num meet the requirement,wait check again...'")
# print '[+] ', 'now ip num meet the requirement,wait check again...', '#'
logging.info("[+] sleep now")
# print "sleep now"
time.sleep(config.CHECK_INTERVAL)
def start_monitors(mount_points):
greenlets = []
for mp in mount_points:
greenlet = gevent.spawn(monitor, mp)
greenlet.start()
greenlets.append(greenlet)
gevent.joinall(greenlets)
def testcase():
print "Version: ", gevent.version_info
gevent.joinall([gevent.spawn(get_data, 'https://python.org/'),
gevent.spawn(get_data, 'https://www.yahoo.com/'),
gevent.spawn(get_data, 'https://github.com/')])
def asynchronous():
# ??????
from gevent.event import AsyncResult
# ??????????AsyncRresult?????????????????????
# ???future?defered???????????????????????????
a = AsyncResult()
def setter():
"""
After 3 seconds set the result of a.
"""
gevent.sleep(1)
a.set('Hello!')
def waiter():
"""
After 3 seconds the get call will unblock after the setter
puts a value into the AsyncResult.
"""
print(a.get())
gevent.joinall([
gevent.spawn(setter),
gevent.spawn(waiter),
gevent.spawn(waiter)
])
def asynchronous():
threads = [gevent.spawn(task, i) for i in range(10)]
gevent.joinall(threads)
def _stop_handler(self):
"""
Handle stop event.
"""
gevent.joinall([self._watcher_co, self._heartbeat_co])
service_proto.unregister(self.ec, self._token)
log.info('OctpServer(%s) stopped.', self.service_name)
def _stop_handler(self):
gevent.joinall([self._watcher_starter_coroutine,])
log.info('OctpClient(%s) stopped.', self.service_names)