def pump_messages(self):
"""Maintain a connection to the broker and handle incoming frames.
This will never return, so it should be run from a separate greenlet.
"""
while True:
try:
self._connect()
LOG.info("connected")
while self.connected:
LOG.debug("pumping")
self.connection.read_frames()
gevent.sleep()
except socket.error as exception:
LOG.warning("connection failed: %s", exception)
gevent.sleep(1)
python类sleep()的实例源码
def connect(self) -> bool:
""" Establish a long running connection to EPMD, will not return until
the connection has been established.
:return: True
"""
while True:
try:
print("EPMD: Connecting %s:%d" % (self.host_, self.port_))
host_port = (self.host_, self.port_)
self.sock_ = socket.create_connection(address=host_port,
timeout=5.0)
break # the connect loop
except socket.error as err:
print("EPMD: connection error:", err)
gevent.sleep(5)
print("EPMD: Socket connected")
return True
def worker(pattern,q):
try:
num = pattern["page_range"]
for i in range(len(pattern["url"])):
index = pattern["url"][i].find("%d")
if index == -1:
get_and_check(pattern["url"][i],pattern,q)
gevent.sleep(10)
continue
for j in range(1,num+1):
url = pattern["url"][i] % j
#log.debug("PID:%d url:%s" % (os.getpid(),url))
get_and_check(url,pattern,q)
gevent.sleep(10)
except Exception as e:
log.error("PID:%d proxy error:%s " % (os.getpid(),e))
def heartbeat(self):
"""Heartbeat function
Every hearbeat_interval seconds, runs registered functions. This will
capture unhandled exceptions and report them.
"""
# Keep beating unless the WSGI worker is shutting down
while self.is_alive():
logger.debug('thump')
self._heartbeat_beat_once()
gevent.sleep(self.heartbeat_interval)
logger.info('App stopped, so stopping heartbeat.')
# We're at worker shutdown, so beat until all registered lifers are ok
# with us shutting down
while any([fun() for fun in _registered_lifers]):
logger.debug('thump (finishing up)')
self._heartbeat_beat_once()
# Faster beat so we can shutdown sooner
gevent.sleep(1)
logger.info('Everything completed.')
def test_exitcode_previous_to_join(self):
p = start_process(lambda: gevent.sleep(SHORTTIME))
# Assume that the child process is still alive when the next
# line is executed by the interpreter (there is no guarantee
# for that, but it's rather likely).
assert p.exitcode is None
# Expect the child watcher mechanism to pick up
# and process the child process termination event
# (within at most two seconds). The `gevent.sleep()`
# invocations allow for libev event loop iterations,
# two of which are required after the OS delivers the
# SIGCHLD signal to the parent process: one iteration
# invokes the child reap loop, and the next invokes
# the libev callback associated with the termination
# event.
deadline = time.time() + 2
while time.time() < deadline:
if p.exitcode is not None:
assert p.exitcode == 0
p.join()
return
gevent.sleep(ALMOSTZERO)
raise Exception('Child termination not detected')
def test_lock_out_of_context_single(self):
r, w = pipe()
g = gevent.spawn(lambda r: r.get(), r)
gevent.sleep(SHORTTIME)
with raises(GIPCLocked):
with r:
pass
# The context manager can't close `r`, as it is locked in `g`.
g.kill(block=False)
# Ensure killing via 'context switch', i.e. yield control to other
# coroutines (otherwise the subsequent close attempt will fail with
# `GIPCLocked` error).
gevent.sleep(-1)
# Close writer first. otherwise, `os.close(r._fd)` would block on Win.
w.close()
r.close()
def test_lock_out_of_context_single(self):
h1, h2 = pipe(True)
g = gevent.spawn(lambda h: h.get(), h1)
gevent.sleep(SHORTTIME)
with raises(GIPCLocked):
with h1:
pass
# Can't close h1 reader on exit, as it is locked in `g`.
g.kill(block=False)
# Ensure killing via 'context switch', i.e. yield control to other
# coroutines (otherwise the subsequent close attempt may fail with
# `GIPCLocked` error).
gevent.sleep(-1)
h2.close() # Closes read and write handles of h2.
assert h1._writer._closed
assert not h1._reader._closed
h1.close() # Closes read handle, ignore that writer is already closed.
assert h1._reader._closed
def test_lock_out_of_context_pair(self):
with raises(GIPCLocked):
with pipe(True) as (h1, h2):
# Write more to pipe than pipe buffer can hold
# (makes `put` block when there is no reader).
# Buffer is quite large on Windows.
gw = gevent.spawn(lambda h: h.put(LONGERTHANBUFFER), h1)
gevent.sleep(SHORTTIME)
# Context manager tries to close h2 reader, h2 writer, and
# h1 writer first. Fails upon latter, must still close
# h1 reader after that.
assert not h1._writer._closed
assert h1._reader._closed
assert h2._writer._closed
assert h2._reader._closed
# Kill greenlet (free lock on h1 writer), close h1 writer.
gw.kill(block=False)
gevent.sleep(-1)
h1.close()
assert h1._writer._closed
def test_lock_out_of_context_pair_3(self):
with raises(GIPCLocked):
with pipe(True) as (h1, h2):
gr1 = gevent.spawn(lambda h: h.get(), h1)
gr2 = gevent.spawn(lambda h: h.get(), h2)
gevent.sleep(SHORTTIME)
# Context succeeds closing h2 writer, fails upon closing h2
# reader. Proceeds closing h1 writer, succeeds, closes h1
# reader and fails.
assert not h2._reader._closed
assert not h1._reader._closed
assert h2._writer._closed
assert h1._writer._closed
gr1.kill(block=False)
gr2.kill(block=False)
gevent.sleep(-1)
h2.close()
h1.close()
def test_lock_out_of_context_pair_4(self):
with raises(GIPCLocked):
with pipe(True) as (h1, h2):
# Write more to pipe than pipe buffer can hold
# (makes `put` block when there is no reader).
# Buffer is quite large on Windows.
gw1 = gevent.spawn(lambda h: h.put(LONGERTHANBUFFER), h1)
gw2 = gevent.spawn(lambda h: h.put(LONGERTHANBUFFER), h2)
gevent.sleep(SHORTTIME)
# Context fails closing h2 writer, succeeds upon closing h2
# reader. Proceeds closing h1 writer, fails, closes h1
# reader and succeeds.
assert h2._reader._closed
assert h1._reader._closed
assert not h2._writer._closed
assert not h1._writer._closed
gw1.kill(block=False)
gw2.kill(block=False)
gevent.sleep(-1)
h2.close()
h1.close()
def privateInterpreter(self):
"""Trivial interpreter implementation, sends command to plac interpreter"""
logging.info("Starting plain interpreter")
char = line = ''
try:
while char != '\x1b': # \x1b = escape character
char = getChar()
if char:
line += char
line += sys.stdin.readline()
if '\n' in line:
self.interpreter.execute([line[:-1]], verbose=True) # '[:-1]' to omit '\n' char
line = ''
sys.stdout.write(">")
sys.stdout.flush()
gevent.sleep(0.1)
except KeyboardInterrupt:
pass
def _stop_client(self):
"""Best effort to stop the client."""
try:
# Make sure not to mistake this scenario with failing to stop
# client.
if self._client is None:
log.info("Kazoo client is None.")
return
_retry((Exception,), tries=3, delay=1, backoff=2,
sleep_func=gevent.sleep)(self._client.stop)()
log.info("Successfully stopped kazoo client.")
except (Exception, gevent.Timeout):
self._sc.increment("errors.zk.client.stop.failure",
tags={'host': hostname},
sample_rate=1)
log.exception("Failed to stop kazoo client.")
def test_serverset_destroy(self):
testutil.initialize_kazoo_client_manager(ZK_HOSTS)
client = KazooClientManager().get_client()
server_set = ServerSet(ServerSetTestCase.SERVER_SET_DESTROY_PATH,
ZK_HOSTS,
waiting_in_secs=0.01)
server_set.join(ServerSetTestCase.PORT_1, use_ip=False)
server_set.join(ServerSetTestCase.PORT_2, use_ip=False)
# Give time to let server set join to do its magic.
gevent.sleep(1)
server_set._destroy(ServerSetTestCase.END_POINT_1)
gevent.sleep(1)
children = client.get_children(
ServerSetTestCase.SERVER_SET_DESTROY_PATH)
for child in children:
self.assertFalse(child.endswith(ServerSetTestCase.END_POINT_1))
def __init__(self, handler=SequentialGeventHandler(), hosts=None):
self.handler = handler
self.hosts = hosts
self._state = KazooState.LOST
self._listeners = []
self.Party = partial(Party, self)
self.ShallowParty = partial(ShallowParty, self)
self.retry = KazooRetry(
max_tries=3,
delay=0.0,
backoff=1,
max_jitter=0.0,
sleep_func=gevent.sleep
)
self.ChildrenWatch = partial(ChildrenWatch, self)
self.DataWatch = partial(DataWatch, self)
self._children_watches = {}
self._data_watches = {}
def _holddown_queue_wiper():
# A greenlet which wakes up every X seconds to clean up
# messages in the holddown queue. It either drops the event
# or put the event back to the _NOTIFICATION_EVENT_QUEUE.
while True:
gevent.sleep(_NOTIFICATION_HOLDDOWN_WIPER_SLEEP_INTERVAL_IN_SECONDS)
while not _NOTIFICATION_HOLDDOWN_QUEUE.empty():
(zk_path, command, value, version, max_wait_in_secs,
watch_type, notification_timestamp) \
= _NOTIFICATION_HOLDDOWN_QUEUE.get()
if (zk_path not in _PATH_TO_DATA) or notification_timestamp \
>= _PATH_TO_DATA[zk_path]['notification_timestamp']:
_NOTIFICATION_EVENT_QUEUE.put(
(zk_path, command, value, version,
max_wait_in_secs, watch_type, notification_timestamp)
)
def _check_local_session_state():
global _ZK_SESSION_ID
while True:
client = _kazoo_client()
log.info("Current zk session id %s", client._session_id)
if _ZK_SESSION_ID is None:
_ZK_SESSION_ID = client._session_id
elif _ZK_SESSION_ID != client._session_id:
log.warning("Zookeeper session changes from %s to %s", _ZK_SESSION_ID,client._session_id)
since_start = datetime.datetime.utcnow() - _START_TIME
if since_start.total_seconds()>180:
_kill("Restart since ZK session changes")
gevent.sleep(60)
#########################################################
####### Funcs dealing with MetaConfig/Dependencies ######
#########################################################
def test_gevent1(self):
"""????????????"""
def foo():
_log.info('Running in foo')
gevent.sleep(0)
_log.info('Explicit context switch to foo again')
def bar():
_log.info('Explicit context to bar')
gevent.sleep(0)
_log.info('Implicit context switch back to bar')
gevent.joinall([
gevent.spawn(foo),
gevent.spawn(bar),
])
def test_greenlet(self):
"""??????Greenlet????"""
class MyGreenlet(gevent.Greenlet):
def __init__(self, message, n):
super(MyGreenlet, self).__init__()
self.message = message
self.n = n
def _run(self):
print(self.message)
gevent.sleep(self.n)
g1 = MyGreenlet("Hi there111!", 1)
g1.start()
g2 = MyGreenlet("Hi there222!", 2)
g2.start()
gevent.joinall([g1, g2])
# def test_shutdown(self):
# def run_forever():
# _log.info('run_forever start..')
# gevent.sleep(1000)
# gevent.signal(signal.SIGQUIT, gevent.kill)
# thread = gevent.spawn(run_forever)
# thread.join()
def test_event(self):
"""????event???????????"""
evt = Event()
def setter():
'''After 3 seconds, wake all threads waiting on the value of evt'''
_log.info('A: Hey wait for me, I have to do something')
gevent.sleep(3)
_log.info("Ok, I'm done")
evt.set()
def waiter():
'''After 3 seconds the get call will unblock'''
_log.info("I'll wait for you")
evt.wait() # blocking
_log.info("It's about time")
gevent.joinall([
gevent.spawn(setter),
gevent.spawn(waiter),
gevent.spawn(waiter),
gevent.spawn(waiter),
gevent.spawn(waiter),
gevent.spawn(waiter)
])
def chat(ws):
"""Relay chat messages to and from clients.
"""
# Subscribe to messages on the specified channel.
channel = request.args.get('channel')
lag_tolerance_secs = float(request.args.get('tolerance', 0.1))
chat_backend.subscribe(ws, channel)
# Send heartbeat ping every 30s
# so Heroku won't close the connection
gevent.spawn(chat_backend.heartbeat, ws)
while not ws.closed:
# Sleep to prevent *constant* context-switches.
gevent.sleep(lag_tolerance_secs)
# Publish messages from client
message = ws.receive()
if message is not None:
channel, data = message.split(':', 1)
conn.publish(channel, data)
def wait_for_sync_etherscan(blockchain_service, url, tolerance, sleep):
local_block = blockchain_service.client.block_number()
etherscan_block = etherscan_query_with_retries(url, sleep)
syncing_str = 'Syncing ... Current: {} / Target: ~{}'
if local_block >= etherscan_block - tolerance:
return
print('Waiting for the ethereum node to synchronize. [Use ^C to exit]')
print(syncing_str.format(local_block, etherscan_block), end='')
for i in count():
sys.stdout.flush()
gevent.sleep(sleep)
local_block = blockchain_service.client.block_number()
# update the oracle block number sparsely to not spam the server
if local_block >= etherscan_block or i % 50 == 0:
etherscan_block = etherscan_query_with_retries(url, sleep)
if local_block >= etherscan_block - tolerance:
return
print(CLEARLINE + CURSOR_STARTLINE, end='')
print(syncing_str.format(local_block, etherscan_block), end='')
def wait_for_sync_rpc_api(blockchain_service, sleep):
if blockchain_service.is_synced():
return
print('Waiting for the ethereum node to synchronize [Use ^C to exit].')
for i in count():
if i % 3 == 0:
print(CLEARLINE + CURSOR_STARTLINE, end='')
print('.', end='')
sys.stdout.flush()
gevent.sleep(sleep)
if blockchain_service.is_synced():
return
def check_node_connection(func):
""" A decorator to reconnect if the connection to the node is lost."""
def retry_on_disconnect(self, *args, **kwargs):
for i, timeout in enumerate(timeout_two_stage(10, 3, 10)):
try:
result = func(self, *args, **kwargs)
if i > 0:
log.info('Client reconnected')
return result
except (requests.exceptions.ConnectionError, InvalidReplyError):
log.info(
'Timeout in eth client connection to {}. Is the client offline? Trying '
'again in {}s.'.format(self.transport.endpoint, timeout)
)
gevent.sleep(timeout)
return retry_on_disconnect
def test_healthcheck_with_unconnected_node(raiden_network, nat_keepalive_timeout):
""" Nodes start at the unknown state. """
app0, app1 = raiden_network # pylint: disable=unbalanced-tuple-unpacking
address0 = app0.raiden.address
address1 = app1.raiden.address
assert app0.raiden.protocol.nodeaddresses_networkstatuses[address1] == NODE_NETWORK_UNKNOWN
assert app1.raiden.protocol.nodeaddresses_networkstatuses[address0] == NODE_NETWORK_UNKNOWN
app0.raiden.start_health_check_for(address1)
gevent.sleep(nat_keepalive_timeout)
assert app0.raiden.protocol.nodeaddresses_networkstatuses[address1] == NODE_NETWORK_REACHABLE
assert app1.raiden.protocol.nodeaddresses_networkstatuses[address0] == NODE_NETWORK_UNKNOWN
def test_healthcheck_with_bad_peer(raiden_network, nat_keepalive_retries, nat_keepalive_timeout):
""" If the Ping messages are not answered, the node must be set to
unreachable.
"""
app0, app1 = raiden_network # pylint: disable=unbalanced-tuple-unpacking
address0 = app0.raiden.address
address1 = app1.raiden.address
assert app0.raiden.protocol.nodeaddresses_networkstatuses[address1] == NODE_NETWORK_REACHABLE
assert app1.raiden.protocol.nodeaddresses_networkstatuses[address0] == NODE_NETWORK_REACHABLE
# Drop all Ping and Ack messages
app0.raiden.protocol.transport.droprate = 1
app1.raiden.protocol.transport.droprate = 1
gevent.sleep(
(nat_keepalive_retries + 2) * nat_keepalive_timeout
)
assert app0.raiden.protocol.nodeaddresses_networkstatuses[address1] == NODE_NETWORK_UNREACHABLE
assert app1.raiden.protocol.nodeaddresses_networkstatuses[address0] == NODE_NETWORK_UNREACHABLE
def geth_create_account(datadir, privkey):
"""
Create an account in `datadir` -- since we're not interested
in the rewards, we don't care about the created address.
Args:
datadir (str): the datadir in which the account is created
"""
keyfile_path = os.path.join(datadir, 'keyfile')
with open(keyfile_path, 'w') as handler:
handler.write(hexlify(privkey))
create = subprocess.Popen(
['geth', '--datadir', datadir, 'account', 'import', keyfile_path],
stdin=subprocess.PIPE,
universal_newlines=True
)
create.stdin.write(DEFAULT_PASSPHRASE + os.linesep)
time.sleep(.1)
create.stdin.write(DEFAULT_PASSPHRASE + os.linesep)
create.communicate()
assert create.returncode == 0
def send(self, sender, host_port, bytes_):
# even dropped packages have to go through throttle_policy
gevent.sleep(self.throttle_policy.consume(1))
if self.droprate:
drop = self.network.counter % self.droprate == 0
else:
drop = False
if not drop:
self.network.send(sender, host_port, bytes_)
else:
# since this path wont go to super.send we need to call track
# ourselves
self.network.track_send(sender, host_port, bytes_)
log.debug(
'dropped packet',
sender=pex(sender),
counter=self.network.counter,
msghash=pex(sha3(bytes_))
)
def wait_for_listening_port(port_number, tries=10, sleep=0.1, pid=None):
if pid is None:
pid = os.getpid()
for _ in range(tries):
gevent.sleep(sleep)
# macoOS requires root access for the connections api to work
# so get connections of the current process only
connections = psutil.Process(pid).connections()
for conn in connections:
if conn.status == 'LISTEN' and conn.laddr[1] == port_number:
return
raise RuntimeError('{port} is not bound'.format(port=port_number))
# TODO: Figure out why this fixture can't work as session scoped
# What happens is that after one test is done, in the next one
# the server is no longer running even though the teardown has not
# been invoked.
def echo_worker(self):
""" The `echo_worker` works through the `self.received_transfers` queue and spawns
`self.on_transfer` greenlets for all not-yet-seen transfers. """
log.debug('echo worker', qsize=self.received_transfers.qsize())
while self.stop_signal is None:
if self.received_transfers.qsize() > 0:
transfer = self.received_transfers.get()
if transfer in self.seen_transfers:
log.debug(
'duplicate transfer ignored',
initiator=pex(transfer['initiator']),
amount=transfer['amount'],
identifier=transfer['identifier']
)
else:
self.seen_transfers.append(transfer)
self.greenlets.append(gevent.spawn(self.on_transfer, transfer))
else:
gevent.sleep(.5)
def synchronous():
# ??????
from gevent.event import Event
evt = Event()
def setter():
print('A: Hey wait for me, I have to do something')
gevent.sleep(3)
print('Ok, I\'m done')
evt.set()
def waiter():
print('I\'ll wait for you')
evt.wait()
print('It\'s about time')
gevent.joinall([gevent.spawn(setter),
gevent.spawn(waiter),
gevent.spawn(waiter),
gevent.spawn(waiter),
gevent.spawn(waiter)
])