def test_group(self):
def talk(msg):
for i in xrange(3):
print(msg)
g1 = gevent.spawn(talk, 'bar')
g2 = gevent.spawn(talk, 'foo')
g3 = gevent.spawn(talk, 'fizz')
group = Group()
group.add(g1)
group.add(g2)
group.join()
group.add(g3)
group.join()
python类Group()的实例源码
def get_conns(cred, providers):
"""Collect node data asynchronously using gevent lib."""
cld_svc_map = {"aws": conn_aws,
"azure": conn_az,
"gcp": conn_gcp}
sys.stdout.write("\rEstablishing Connections: ")
sys.stdout.flush()
busy_obj = busy_disp_on()
conn_fn = [[cld_svc_map[x.rstrip('1234567890')], cred[x], x]
for x in providers]
cgroup = Group()
conn_res = []
conn_res = cgroup.map(get_conn, conn_fn)
cgroup.join()
conn_objs = {}
for item in conn_res:
conn_objs.update(item)
busy_disp_off(dobj=busy_obj)
sys.stdout.write("\r \r")
sys.stdout.write("\033[?25h") # cursor back on
sys.stdout.flush()
return conn_objs
def get_data(conn_objs, providers):
"""Refresh node data using previous connection-objects."""
cld_svc_map = {"aws": nodes_aws,
"azure": nodes_az,
"gcp": nodes_gcp}
sys.stdout.write("\rCollecting Info: ")
sys.stdout.flush()
busy_obj = busy_disp_on()
collec_fn = [[cld_svc_map[x.rstrip('1234567890')], conn_objs[x]]
for x in providers]
ngroup = Group()
node_list = []
node_list = ngroup.map(get_nodes, collec_fn)
ngroup.join()
busy_disp_off(dobj=busy_obj)
sys.stdout.write("\r \r")
sys.stdout.write("\033[?25h") # cursor back on
sys.stdout.flush()
return node_list
def __init__(self, locust_classes, options):
self.locust_classes = locust_classes
self.hatch_rate = options.hatch_rate
self.num_clients = options.num_clients
self.num_requests = options.num_requests
self.host = options.host
self.locusts = Group()
self.state = STATE_INIT
self.hatching_greenlet = None
self.exceptions = {}
self.stats = global_stats
# register listener that resets stats when hatching is complete
def on_hatch_complete(user_count):
self.state = STATE_RUNNING
logger.info("Resetting stats\n")
self.stats.reset_all()
events.hatch_complete += on_hatch_complete
def kill_locusts(self, kill_count):
"""
Kill a kill_count of weighted locusts from the Group() object in self.locusts
"""
bucket = self.weight_locusts(kill_count)
kill_count = len(bucket)
self.num_clients -= kill_count
logger.info("Killing %i locusts" % kill_count)
dying = []
for g in self.locusts:
for l in bucket:
if l == g.args[0]:
dying.append(g)
bucket.remove(l)
break
for g in dying:
self.locusts.killone(g)
events.hatch_complete.fire(user_count=self.num_clients)
def pipeline(stages, initial_data):
monitors = Group()
# Make sure items in initial_data are iterable.
if not isinstance(initial_data, types.GeneratorType):
try:
iter(initial_data)
except:
raise TypeError('initial_data must be iterable')
# The StopIteration will bubble through the queues as it is reached.
# Once a stage monitor sees it, it indicates that the stage will read
# no more data and the monitor can wait for the current work to complete
# and clean up.
if hasattr(initial_data, 'append'):
initial_data.append(StopIteration)
if not stages:
return PipelineResult(monitors, [])
# chain stage queue io
# Each stage shares an output queue with the next stage's input.
qs = [initial_data] + [Queue() for _ in range(len(stages))]
for stage, in_q, out_q in zip(stages, qs[:-1], qs[1:]):
stage.in_q = in_q
stage.out_q = out_q
monitors.spawn(stage_monitor, stage)
gevent.sleep(0)
return PipelineResult(monitors, stages[-1].out_q)
def gevent_click_page():
global TRY_COUNT
TRY_COUNT = int(sys.argv[1])
_log.info('????????...')
# ????????
driver = webdriver.PhantomJS()
driver.get('https://www.xncoding.com/archives/')
# driver.maximize_window()
posts_count = len(driver.find_elements_by_xpath(
'//article/header/h1[@class="post-title"]/a[@class="post-title-link"]'))
driver.close()
# gevent?pool??
psize = posts_count / THREAD_COUNT
_log.info('???????:{}, ??????????:{}'.format(posts_count, psize))
group = Group()
for i in range(0, THREAD_COUNT + 1):
group.add(gevent.spawn(_click_page, posts_count, psize, i))
group.join()
_log.info('????...')
def __init__(self, callback=None, ttype=None, source=None):
""" ??????? """
self.group = Group()
self.task_queue = Queue()
self.task_type = ttype
self.cb = callback
self.source = source
self.task_name = "%s-%s-%s" % (socket.gethostname(),
self.source,
self.task_type)
def __init__(self, *args, **kwargs):
super(MasterLocustRunner, self).__init__(*args, **kwargs)
class SlaveNodesDict(dict):
def get_by_state(self, state):
return [c for c in self.values() if c.state == state]
@property
def ready(self):
return self.get_by_state(STATE_INIT)
@property
def hatching(self):
return self.get_by_state(STATE_HATCHING)
@property
def running(self):
return self.get_by_state(STATE_RUNNING)
self.clients = SlaveNodesDict()
self.server = rpc.Server(self.master_bind_host, self.master_bind_port)
self.greenlet = Group()
self.greenlet.spawn(self.client_listener).link_exception(callback=self.noop)
# listener that gathers info on how many locust users the slaves has spawned
def on_slave_report(client_id, data):
if client_id not in self.clients:
logger.info("Discarded report from unrecognized slave %s", client_id)
return
self.clients[client_id].user_count = data["user_count"]
events.slave_report += on_slave_report
# register listener that sends quit message to slave nodes
def on_quitting():
self.quit()
events.quitting += on_quitting
def __init__(self, *args, **kwargs):
super(SlaveLocustRunner, self).__init__(*args, **kwargs)
self.client_id = socket.gethostname() + "_" + md5(str(time() + random.randint(0, 10000)).encode('utf-8')).hexdigest()
self.client = rpc.Client(self.master_host, self.master_port)
self.greenlet = Group()
self.greenlet.spawn(self.worker).link_exception(callback=self.noop)
self.client.send(Message("client_ready", None, self.client_id))
self.greenlet.spawn(self.stats_reporter).link_exception(callback=self.noop)
# register listener for when all locust users have hatched, and report it to the master node
def on_hatch_complete(user_count):
self.client.send(Message("hatch_complete", {"count":user_count}, self.client_id))
events.hatch_complete += on_hatch_complete
# register listener that adds the current number of spawned locusts to the report that is sent to the master node
def on_report_to_master(client_id, data):
data["user_count"] = self.user_count
events.report_to_master += on_report_to_master
# register listener that sends quit message to master
def on_quitting():
self.client.send(Message("quit", None, self.client_id))
events.quitting += on_quitting
# register listener thats sends locust exceptions to master
def on_locust_error(locust_instance, exception, tb):
formatted_tb = "".join(traceback.format_tb(tb))
self.client.send(Message("exception", {"msg" : str(exception), "traceback" : formatted_tb}, self.client_id))
events.locust_error += on_locust_error
def __init__(self):
self.mods=[]
self.runMods=[]
self.group = Group()
self.app=None
self.ProcessID=None
def __init__(self, name, group=pool.Group()):
self.name = name
self._commands = {} # cmd_id : command
self._greenlets = {} # cmd_id : greenlet
self._decorators = {} # cmd_id : callable
self._group = group
self._queue = queue.Queue()
self.services.append(weakref.ref(self))
def stage_monitor(stage):
"""
Stage monitor is a worker that monitors a stage while it is being executed.
The stage monitor coordinates running stage workers, saving results, and
determining the end of any particular stage.
"""
# Pool of stage function worker greenlets.
work_pool = Pool(size=stage.n_workers)
# Group of greenlets which save results from workers via callbacks.
save_group = Group()
def save_result(x):
"""
Save results onto the output queue as a tuple or if there is only
a single returned value, save that instead as that singular item.
"""
if type(stage) == Reduce:
# XXX: This would not work for stream inputs afaict
# But, reduction should not work anyway
if len(work_pool) + len(save_group) + len(stage.in_q) == 1:
stage.out_q.put(x)
else:
if not stage.returns_many:
stage.out_q.put(x)
else:
try:
for i in x:
stage.out_q.put(i)
except:
stage.out_q.put([x])
for x in stage.in_q:
"""
Iterate the input queue until StopIteration is received.
Spawn new workers for work items on the input queue.
Keep track of storing results via a group of result saving greenlets.
Ignore all DROP items in the input queue.
Once we receive a StopIteration, wait for all open workers to finish
and once they are finished, bubble the StopIteration to the next stage
"""
gevent.sleep(0)
if x is DROP:
continue
if x is StopIteration:
break
func_args = [x]
cb_worker = work_pool.apply_async(stage.func,
func_args,
callback=save_result)
save_group.add(cb_worker)
logger.debug('Worker Pool: << {} >>'.format(work_pool))
work_pool.join()
save_group.join()
stage.out_q.put(StopIteration)
return stage