def test_run(self, chat):
client = Mock()
chat.pubsub = Mock()
chat.pubsub.channels = ['quorum']
chat.pubsub.listen.return_value = [{
'type': 'message',
'channel': 'quorum',
'data': 'Calloo! Callay!',
}]
chat.greenlet = Mock()
chat.subscribe(client, 'quorum')
chat.run()
gevent.wait() # wait for event loop
client.send.assert_called_once_with('quorum:Calloo! Callay!')
python类wait()的实例源码
def stop_and_wait(self):
# Stop handling incoming packets, but don't close the socket. The
# socket can only be safely closed after all outgoing tasks are stopped
self.transport.stop_accepting()
# Stop processing the outgoing queues
self.event_stop.set()
gevent.wait(self.greenlets)
# All outgoing tasks are stopped. Now it's safe to close the socket. At
# this point there might be some incoming message being processed,
# keeping the socket open is not useful for these.
self.transport.stop()
# Set all the pending results to False
for waitack in self.senthashes_to_states.itervalues():
waitack.async_result.set(False)
def send_and_wait(self, recipient, message, timeout):
""" Send `message` to `recipient` and wait for the response or `timeout`.
Args:
recipient (address): The address of the node that will receive the
message.
message: The transfer message.
timeout (float): How long should we wait for a response from `recipient`.
Returns:
None: If the wait timed out
object: The result from the event
"""
if not isaddress(recipient):
raise ValueError('recipient is not a valid address.')
self.protocol.send_and_wait(recipient, message, timeout)
def test_leaving(raiden_network, token_addresses):
token_address = token_addresses[0]
connection_managers = [
app.raiden.connection_manager_for_token(token_address) for app in raiden_network
]
all_channels = list(
itertools.chain.from_iterable(
connection_manager.receiving_channels for connection_manager in connection_managers
)
)
leaving_async = [
app.raiden.leave_all_token_networks_async() for app in raiden_network[1:]
]
gevent.wait(leaving_async, timeout=50)
assert not connection_managers[0].receiving_channels
assert all(
channel.state == CHANNEL_STATE_SETTLED
for channel in all_channels
)
def runner(args):
if args.command == "file":
jobs = []
files = frozenset()
for fn in args.file:
files = files.union(glob.iglob(fn))
logger.info("Processing files: %s", files)
for f in files:
argsCopy = copy.deepcopy(args)
with open(f, "r", newline="\n") as fh:
argsCopy.file = fh
contentArgs = ContentArgParser(fh)
contentArgs.updateArgs(argsCopy)
logger.debug("Updated args: %s", argsCopy)
jobs.append(gevent.spawn(processItem, argsCopy, contentArgs))
gevent.wait(jobs)
return 0
else:
return processItem(args)
def main():
parser = argparse.ArgumentParser(description="Haproxy agent check service")
parser.add_argument("-c", "--config",
default="/etc/herald/config.yml",
type=str,
help="path to yaml configuraion file")
parser.add_argument("-b", "--bind",
default='0.0.0.0',
type=str,
help="listen address")
parser.add_argument("-p", "--port",
default=5555,
type=int,
help="listen port")
parser.add_argument("-l", "--loglevel",
default='info',
choices=['info', 'warn', 'debug', 'critical'],
type=str,
help="set logging level")
args = parser.parse_args()
setup_logging(args)
config = load_configuration(args.config)
all_plugins = load_all_plugins(config['plugins_dir'])
plugin = load_plugin(all_plugins, config['plugins'])
start_plugin(plugin)
server = start_server(args, config, plugin)
setup_handlers(server, plugin)
gevent.wait()
def amqp_consume(self):
"""Block on AMQP channel messages until an exception raises"""
log.info("%s: Starting AMQP consumer.", self.lbl)
try:
while True:
self.chan.wait()
except BaseException as exc:
log.error("%s: AMQP consumer exception %r, stopping.",
self.lbl, exc)
self.close_chan()
self.close_conn()
def main(args=None, workers=None, client=EchoHubClient, worker_kwargs=None):
gevent.monkey.patch_all()
args = args if args else prepare_argparse().parse_args()
prepare_logging(args.verbose or 1)
if args.mode == 'server':
hub = HubServer(workers=workers)
elif args.mode == 'client':
hub = client(worker_kwargs=worker_kwargs)
else:
raise Exception("Unknown mode '%s'." % args.mode)
def sig_handler(sig=None, frame=None):
log.warning("Hub process received SIGTERM/SIGINT")
hub.stop()
log.info("Sig handler completed.")
gevent.signal(signal.SIGTERM, sig_handler)
gevent.signal(signal.SIGINT, sig_handler) # KeyboardInterrupt also
hub.start()
gevent.wait()
def chat(sockets, pubsub):
chat = sockets.ChatBackend()
chat.pubsub = pubsub
yield chat
gevent.wait()
def test_heartbeat(self, chat, sockets):
client = Mock()
client.closed = False
chat.send = Mock()
sockets.HEARTBEAT_DELAY = 1
gevent.spawn(chat.heartbeat, client)
gevent.sleep(2)
client.closed = True
gevent.wait()
chat.send.assert_called_with(client, 'ping')
def retry(protocol, data, receiver_address, event_stop, timeout_backoff):
""" Send data until it's acknowledged.
Exits when the first of the following happen:
- The packet is acknowledged.
- Event_stop is set.
- The iterator timeout_backoff runs out of values.
Returns:
bool: True if the message was acknowledged, False otherwise.
"""
async_result = protocol.send_raw_with_result(
data,
receiver_address,
)
event_quit = event_first_of(
async_result,
event_stop,
)
for timeout in timeout_backoff:
if event_quit.wait(timeout=timeout) is True:
break
protocol.send_raw_with_result(
data,
receiver_address,
)
return async_result.ready()
def wait_recovery(event_stop, event_healthy):
event_first_of(
event_stop,
event_healthy,
).wait()
if event_stop.is_set():
return
# There may be multiple threads waiting, do not restart them all at
# once to avoid message flood.
gevent.sleep(random.random())
def send_and_wait(self, receiver_address, message, timeout=None):
"""Sends a message and wait for the response ack."""
async_result = self.send_async(receiver_address, message)
return async_result.wait(timeout=timeout)
def leave_all_token_networks_async(self):
token_addresses = self.token_to_channelgraph.keys()
leave_results = []
for token_address in token_addresses:
try:
connection_manager = self.connection_manager_for_token(token_address)
except InvalidAddress:
pass
leave_results.append(connection_manager.leave_async())
combined_result = AsyncResult()
gevent.spawn(gevent.wait, leave_results).link(combined_result)
return combined_result
def test_throughput(apps, tokens, num_transfers, amount):
def start_transfers(curr_app, curr_token, num_transfers):
graph = curr_app.raiden.token_to_channelgraph[curr_token]
all_paths = graph.get_paths_of_length(
source=curr_app.raiden.address,
num_hops=2,
)
path = all_paths[0]
target = path[-1]
api = curr_app.raiden.api
events = list()
for i in range(num_transfers):
async_result = api.transfer_async(
curr_token,
amount,
target,
1) # TODO: fill in identifier
events.append(async_result)
return events
finished_events = []
# Start all transfers
start_time = time.time()
for idx, curr_token in enumerate(tokens):
curr_app = apps[idx]
finished = start_transfers(curr_app, curr_token, num_transfers)
finished_events.extend(finished)
# Wait until the transfers for all tokens are done
gevent.wait(finished_events)
elapsed = time.time() - start_time
completed_transfers = num_transfers * len(tokens)
tps = completed_transfers / elapsed
print('Completed {} transfers {:.5} tps / {:.5}s'.format(completed_transfers, tps, elapsed))
def stop_and_wait(self):
self.stop_event.set(True)
gevent.wait(self)
def stop(self):
self.stop_signal = True
self.greenlets.append(self.echo_worker_greenlet)
gevent.wait(self.greenlets)
def flood_to(self, subscriptions):
jobs = [
gevent.spawn(self._pull_from, subscription)
for subscription in subscriptions
]
gevent.wait(jobs)
def main():
server = MixedTCPServer(LISTEN_PORT, SS_PORT)
gevent.signal(signal.SIGTERM, server.close)
gevent.signal(signal.SIGINT, server.close)
server.start()
gevent.wait()
def main():
args = sys.argv[1:]
if len(args) != 2:
sys.exit('Usage: %s source-address destination-address' % __file__)
source = args[0]
dest = parse_address(args[1])
server = PortForwarder(source, dest)
log('Starting port forwarder %s:%s -> %s:%s', *(server.address[:2] + dest))
gevent.signal(signal.SIGTERM, server.close)
gevent.signal(signal.SIGINT, server.close)
server.start()
gevent.wait()
def get(self, timeout=None):
return gevent.wait(self._parts, timeout=timeout)
def wait(self, timeout=None):
gevent.joinall(self._parts, timeout=None)
def main(sysargv=sys.argv):
args = parse_args(sysargv[1:])
print(args)
jobs = [gevent.spawn(convert, args.file, args.stdin, i)
for i in range(args.count)]
gevent.wait(jobs)
def amqp_consume(self):
"""Connect to Hub Server and set up and start AMQP consumer"""
# define callback queue
self.queue = self.chan.queue_declare(exclusive=True).queue
self.chan.queue_bind(self.queue, self.exchange, self.queue)
self.chan.basic_consume(self.queue, callback=self.amqp_handle_msg,
no_ack=True)
log.debug("%s: Initialized amqp connection, channel, queue.", self.lbl)
# send rpc request
self.worker_id = None
self.correlation_id = uuid.uuid4().hex
reply_to = self.queue
routing_key = '%s.worker.%s' % (self.key, self.worker_type)
msg = amqp.Message(json.dumps(self.worker_kwargs),
correlation_id=self.correlation_id,
reply_to=reply_to,
content_type='application/json')
self.amqp_send_msg(msg, routing_key)
log.info("%s: sent RPC request, will wait for response.", self.lbl)
# wait for rpc response
try:
while not self.worker_id:
log.debug("%s: Waiting for RPC response.", self.lbl)
self.chan.wait()
except BaseException as exc:
log.error("%s: Amqp consumer received %r while waiting for RPC "
"response. Stopping.", self.lbl, exc)
log.info("%s: Finished waiting for RPC response.", self.lbl)
super(HubClient, self).amqp_consume()
def _send(self, command, payload=None):
# send rpc request
if self.correlation_id:
raise Exception("Can't send second request while already waiting.")
self.response = None
self.correlation_id = uuid.uuid4().hex
routing_key = '%s.%s' % (self.key, command)
msg = amqp.Message(json.dumps(payload),
correlation_id=self.correlation_id,
reply_to=self.queue,
content_type='application/json')
log.debug("Sending AMQP msg with routing key '%s' and body %r.",
routing_key, msg.body)
self.chan.basic_publish(msg, self.exchange, routing_key)
log.info("Sent RPC request, will wait for response.")
# wait for rpc response
try:
while self.correlation_id:
log.debug("Waiting for RPC response.")
self.chan.wait()
except BaseException as exc:
log.error("Amqp consumer received %r while waiting for RPC "
"response. Stopping.", exc)
log.info("Finished waiting for RPC response.")
response = self.response
self.response = None
return response
def test_component_spec():
assert GenerateTestData.get_spec() == {
'name': 'tests.components/GenerateTestData',
'description': '"Generates stream of packets under control of a counter',
'inPorts': [
{
'addressable': False,
'description': '',
'id': 'wait',
'required': False,
'type': 'bang'
},
{
'addressable': False,
'default': 1,
'description': 'Count of packets to be generated',
'id': 'COUNT',
'required': False,
'schema': {'type': 'int'}
}
],
'outPorts': [
{
'addressable': False,
'description': '',
'id': 'done',
'required': False,
'type': 'bang'
},
{
'addressable': False,
'description': 'Generated stream',
'id': 'OUT',
'required': False,
'schema': {'type': 'string'}
}
],
'subgraph': False
}
def serve_runtime(runtime=None, host=DEFAULTS['host'], port=DEFAULTS['port'],
registry_host=DEFAULTS['registry_host'],
registry_port=DEFAULTS['registry_port']):
runtime = runtime if runtime is not None else Runtime()
address = 'ws://{}:{:d}'.format(host, port)
def runtime_application_task():
"""
This greenlet runs the websocket server that responds to remote commands
that inspect/manipulate the Runtime.
"""
print('Runtime listening at {}'.format(address))
WebSocketRuntimeApplication.runtimes[port] = runtime
try:
r = geventwebsocket.Resource(
OrderedDict([('/', WebSocketRuntimeApplication)]))
s = geventwebsocket.WebSocketServer(('', port), r)
s.serve_forever()
finally:
WebSocketRuntimeApplication.runtimes.pop(port)
def local_registration_task():
"""
This greenlet will run the rill registry to register the runtime with
the ui.
"""
from rill.registry import serve_registry
serve_registry(registry_host, registry_port, host, port)
tasks = [runtime_application_task, local_registration_task]
# Start!
gevent.wait([gevent.spawn(t) for t in tasks])
def _yield_to_others(sleep):
if any(
[gevent.monkey.is_module_patched(mod)
for mod in ["socket", "subprocess"]]):
gevent.wait(timeout=sleep)
else:
time.sleep(sleep)
def _run(self):
# in_cpubound_thread is sentinel to prevent double thread dispatch
thread_ctx = threading.local()
thread_ctx.in_cpubound_thread = True
try:
self.in_async = gevent.get_hub().loop.async()
self.in_q_has_data = gevent.event.Event()
self.in_async.start(self.in_q_has_data.set)
while not self.stopping:
if not self.in_q:
# wait for more work
self.in_q_has_data.clear()
self.in_q_has_data.wait()
continue
# arbitrary non-preemptive service discipline can go here
# FIFO for now, but we should experiment with others
jobid, func, args, kwargs = self.in_q.popleft()
start_time = arrow.now()
try:
with db.cleanup_session():
self.results[jobid] = func(*args, **kwargs)
except Exception as e:
log.exception("Exception raised in cpubound_thread:")
self.results[jobid] = self._Caught(e)
finished_time = arrow.now()
run_delta = finished_time - start_time
log.d("Function - '{}'\n".format(func.__name__),
"\tRunning time: {}\n".format(run_delta),
"\tJobs left:", len(self.in_q),
)
self.out_q.append(jobid)
self.out_async.send()
except BaseException:
self._error()
# this may always halt the server process
def apply(self, func, args, kwargs):
done = gevent.event.Event()
self.in_q.append((done, func, args, kwargs))
while not self.in_async:
gevent.sleep(0.01) # poll until worker thread has initialized
self.in_async.send()
done.wait()
res = self.results[done]
del self.results[done]
if isinstance(res, self._Caught):
raise res.err
return res