def debug_status(self, event):
table = MessageTable()
table.set_header('Metric', 'Value')
table.add('Guilds', len(self.state.guilds))
table.add('Channels', len(self.state.channels))
table.add('Users', len(self.state.users))
try:
import psutil
memory = psutil.Process().memory_info()
table.add('Memory RSS', sizeof_fmt(memory.rss))
table.add('Memory VMS', sizeof_fmt(memory.vms))
except ImportError:
pass
table.add('Greenlets', gevent.get_hub().loop.activecnt)
event.msg.reply(table.compile())
python类get_hub()的实例源码
def test_threadpool_resolver_mp(self):
h = gevent.get_hub()
t = h.threadpool
r = h.resolver
p = start_process(target=complchild_test_threadpool_resolver_mp)
p.join(timeout=1)
assert p.exitcode == 0
def complchild_test_threadpool_resolver_mp():
h = gevent.get_hub()
t = h.threadpool
r = h.resolver
def start(self):
# Start grabbing SIGCHLD within libev event loop.
gevent.get_hub().loop.install_sigchld()
# Run new process (based on `fork()` on POSIX-compliant systems).
super(_GProcess, self).start()
# The occurrence of SIGCHLD is recorded asynchronously in libev.
# This guarantees proper behavior even if the child watcher is
# started after the child exits. Start child watcher now.
self._sigchld_watcher = gevent.get_hub().loop.child(self.pid)
self._returnevent = gevent.event.Event()
self._sigchld_watcher.start(
self._on_sigchld, self._sigchld_watcher)
log.debug("SIGCHLD watcher for %s started.", self.pid)
def get(self, timeout=None):
"""Receive, decode and return data from the pipe. Block
gevent-cooperatively until data is available or timeout expires. The
default decoder is ``pickle.loads``.
:arg timeout: ``None`` (default) or a ``gevent.Timeout``
instance. The timeout must be started to take effect and is
canceled when the first byte of a new message arrives (i.e.
providing a timeout does not guarantee that the method completes
within the timeout interval).
:returns: a Python object.
Raises:
- :exc:`gevent.Timeout` (if provided)
- :exc:`GIPCError`
- :exc:`GIPCClosed`
- :exc:`pickle.UnpicklingError`
Recommended usage for silent timeout control::
with gevent.Timeout(TIME_SECONDS, False) as t:
reader.get(timeout=t)
.. warning::
The timeout control is currently not available on Windows,
because Windows can't apply select() to pipe handles.
An ``OSError`` is expected to be raised in case you set a
timeout.
"""
self._validate()
with self._lock:
if timeout:
# Wait for ready-to-read event.
h = gevent.get_hub()
h.wait(h.loop.io(self._fd, 1))
timeout.cancel()
msize, = struct.unpack("!i", self._recv_in_buffer(4).getvalue())
bindata = self._recv_in_buffer(msize).getvalue()
return self._decoder(bindata)
def monkey_patch_for_gevent():
import functools, gevent
apply_e = gevent.get_hub().threadpool.apply_e
def monkey_patch(func):
@functools.wraps(func)
def wrap(*args, **kwargs):
#if DEBUG:print('%s called with %s %s' % (func, args, kwargs))
return apply_e(Exception, func, args, kwargs)
return wrap
for attr in dir(ODBC_API):
if attr.startswith('SQL') and hasattr(getattr(ODBC_API, attr), 'argtypes'):
setattr(ODBC_API, attr, monkey_patch(getattr(ODBC_API, attr)))
def enable_greenlet_debugger():
def _print_exception(self, context, type_, value, traceback):
ultratb.VerboseTB(call_pdb=True)(type_, value, traceback)
resp = raw_input('Debugger exited. Do you want to quit raiden? [Y/n] ').strip().lower()
if not resp or resp.startswith('y'):
os.kill(os.getpid(), signal.SIGTERM)
gevent.get_hub().__class__.print_exception = _print_exception
def _notify_waiter(self, service_name, action):
"""
????service_name????waiter
:param service_name:
:param action: ??????????constatn.SERVICE_ACTION
:type service_name: str
:type action: str
:return:
"""
for waiter in self._waiter_dict[service_name]:
gevent.get_hub().loop.run_callback(lambda: waiter.switch(action))
#### ??service??? ####
def monkey_patch_for_gevent():
import functools, gevent
apply_e = gevent.get_hub().threadpool.apply_e
def monkey_patch(func):
@functools.wraps(func)
def wrap(*args, **kwargs):
#if DEBUG:print('%s called with %s %s' % (func, args, kwargs))
return apply_e(Exception, func, args, kwargs)
return wrap
for attr in dir(ODBC_API):
if attr.startswith('SQL') and hasattr(getattr(ODBC_API, attr), 'argtypes'):
setattr(ODBC_API, attr, monkey_patch(getattr(ODBC_API, attr)))
def monkey_patch_for_gevent():
import functools, gevent
apply_e = gevent.get_hub().threadpool.apply_e
def monkey_patch(func):
@functools.wraps(func)
def wrap(*args, **kwargs):
#if DEBUG:print('%s called with %s %s' % (func, args, kwargs))
return apply_e(Exception, func, args, kwargs)
return wrap
for attr in dir(ODBC_API):
if attr.startswith('SQL') and hasattr(getattr(ODBC_API, attr), 'argtypes'):
setattr(ODBC_API, attr, monkey_patch(getattr(ODBC_API, attr)))
def monkey_patch_for_gevent():
import functools, gevent
apply_e = gevent.get_hub().threadpool.apply_e
def monkey_patch(func):
@functools.wraps(func)
def wrap(*args, **kwargs):
#if DEBUG:print('%s called with %s %s' % (func, args, kwargs))
return apply_e(Exception, func, args, kwargs)
return wrap
for attr in dir(ODBC_API):
if attr.startswith('SQL') and hasattr(getattr(ODBC_API, attr), 'argtypes'):
setattr(ODBC_API, attr, monkey_patch(getattr(ODBC_API, attr)))
def monkey_patch_for_gevent():
import functools, gevent
apply_e = gevent.get_hub().threadpool.apply_e
def monkey_patch(func):
@functools.wraps(func)
def wrap(*args, **kwargs):
#if DEBUG:print('%s called with %s %s' % (func, args, kwargs))
return apply_e(Exception, func, args, kwargs)
return wrap
for attr in dir(ODBC_API):
if attr.startswith('SQL') and hasattr(getattr(ODBC_API, attr), 'argtypes'):
setattr(ODBC_API, attr, monkey_patch(getattr(ODBC_API, attr)))
def monkey_patch_for_gevent():
import functools, gevent
apply_e = gevent.get_hub().threadpool.apply_e
def monkey_patch(func):
@functools.wraps(func)
def wrap(*args, **kwargs):
#if DEBUG:print('%s called with %s %s' % (func, args, kwargs))
return apply_e(Exception, func, args, kwargs)
return wrap
for attr in dir(ODBC_API):
if attr.startswith('SQL') and hasattr(getattr(ODBC_API, attr), 'argtypes'):
setattr(ODBC_API, attr, monkey_patch(getattr(ODBC_API, attr)))
def __init__(self, gateway_url, client):
self._gateway_url = gateway_url
self._client = client
self._status = Status.DISCONNECTED
self._ws = None
self._ws_greenlet = None
self._heartbeat_greenlet = None
self._loop = gevent.get_hub().loop
self._seq = None
## Todo: use erlpack
self._encode = json.dumps
self._decode = json.loads
def apply_patch(hogging_detection=False, real_threads=1):
_logger.info('applying gevent patch (%s real threads)', real_threads)
# real_threads is 1 by default so it will be possible to run watch_threads concurrently
if hogging_detection:
real_threads += 1
if real_threads:
_RealThreadsPool(real_threads)
_patch_module_locks()
import gevent
import gevent.monkey
for m in ["easypy.threadtree", "easypy.concurrency"]:
assert m not in sys.modules, "Must apply the gevent patch before importing %s" % m
gevent.monkey.patch_all(Event=True, sys=True)
_unpatch_logging_handlers_lock()
global HUB
HUB = gevent.get_hub()
global threading
import threading
for thread in threading.enumerate():
_set_thread_uuid(thread.ident)
_set_main_uuid() # the patched threading has a new ident for the main thread
# this will declutter the thread dumps from gevent/greenlet frames
from .threadtree import _BOOTSTRAPPERS
import gevent, gevent.threading, gevent.greenlet
_BOOTSTRAPPERS.update([gevent, gevent.threading, gevent.greenlet])
if hogging_detection:
import greenlet
greenlet.settrace(lambda *args: _greenlet_trace_func(*args))
defer_to_thread(detect_hogging, 'detect-hogging')
def __init__(self, qualifier=None, time_bucket=None):
self._event_queue = collections.deque()
# Set to True when the main loop is actively processing the input
# queue or has been scheduled to do so. Set to False when the loop
# runs out of work and switches to the Hub to wait for more.
self._scheduled = True
# (Monotonic time) timestamp of last schedule.
self._last_scheduled = None
# Cache the gevent Hub and main loop.
self._gevent_hub = gevent.get_hub()
self._gevent_loop = self._gevent_hub.loop
self.greenlet = TimedGreenlet(
self._loop,
time_bucket=time_bucket or self.__class__.__name__
)
self._op_count = 0
self._current_msg = None
self.started = False
# Message being processed; purely for logging.
self.msg_id = None
# Logging parameters
self.qualifier = qualifier
if qualifier:
self.name = "%s(%s)" % (self.__class__.__name__, qualifier)
else:
self.name = self.__class__.__name__
# Can't use str(self) yet, it might not be ready until subclass
# constructed.
_log.info("%s created.", self.name)
def __init__(self, qualifier=None):
self._event_queue = collections.deque()
# Set to True when the main loop is actively processing the input
# queue or has been scheduled to do so. Set to False when the loop
# runs out of work and switches to the Hub to wait for more.
self._scheduled = True
# (Monotonic time) timestamp of last schedule.
self._last_scheduled = None
# Cache the gevent Hub and main loop.
self._gevent_hub = gevent.get_hub()
self._gevent_loop = self._gevent_hub.loop
self.greenlet = gevent.Greenlet(self._loop)
self._op_count = 0
self._current_msg = None
self.started = False
# Message being processed; purely for logging.
self.msg_id = None
# Logging parameters
self.qualifier = qualifier
if qualifier:
self.name = "%s(%s)" % (self.__class__.__name__, qualifier)
else:
self.name = self.__class__.__name__
# Can't use str(self) yet, it might not be ready until subclass
# constructed.
_log.info("%s created.", self.name)
def __init__(self):
self.in_q = collections.deque()
self.out_q = collections.deque()
self.in_async = None
self.out_async = gevent.get_hub().loop.async()
self.out_q_has_data = gevent.event.Event()
self.out_async.start(self.out_q_has_data.set)
self.worker = threading.Thread(target=self._run)
self.worker.daemon = True
self.stopping = False
self.results = {}
# start running thread / greenlet after everything else is set up
self.worker.start()
self.notifier = gevent.spawn(self._notify)
def _run(self):
# in_cpubound_thread is sentinel to prevent double thread dispatch
thread_ctx = threading.local()
thread_ctx.in_cpubound_thread = True
try:
self.in_async = gevent.get_hub().loop.async()
self.in_q_has_data = gevent.event.Event()
self.in_async.start(self.in_q_has_data.set)
while not self.stopping:
if not self.in_q:
# wait for more work
self.in_q_has_data.clear()
self.in_q_has_data.wait()
continue
# arbitrary non-preemptive service discipline can go here
# FIFO for now, but we should experiment with others
jobid, func, args, kwargs = self.in_q.popleft()
start_time = arrow.now()
try:
with db.cleanup_session():
self.results[jobid] = func(*args, **kwargs)
except Exception as e:
log.exception("Exception raised in cpubound_thread:")
self.results[jobid] = self._Caught(e)
finished_time = arrow.now()
run_delta = finished_time - start_time
log.d("Function - '{}'\n".format(func.__name__),
"\tRunning time: {}\n".format(run_delta),
"\tJobs left:", len(self.in_q),
)
self.out_q.append(jobid)
self.out_async.send()
except BaseException:
self._error()
# this may always halt the server process
def monkey_patch_for_gevent():
import functools, gevent
apply_e = gevent.get_hub().threadpool.apply_e
def monkey_patch(func):
@functools.wraps(func)
def wrap(*args, **kwargs):
#if DEBUG:print('%s called with %s %s' % (func, args, kwargs))
return apply_e(Exception, func, args, kwargs)
return wrap
for attr in dir(ODBC_API):
if attr.startswith('SQL') and hasattr(getattr(ODBC_API, attr), 'argtypes'):
setattr(ODBC_API, attr, monkey_patch(getattr(ODBC_API, attr)))
def run(self, auto_join=False):
"""
Runs Dissonance, loading all the modules, starting the web service, and starting the adapter.
If auto_join=True, this function will not return, and will run until dissonance stops if starting dissonance from
outside of a greenlet.
"""
if self.running:
raise RuntimeError("Dissonance is already running!")
logger.info("Starting Dissonance v%s", self.version)
logger.info("Starting storage %s", self._storage)
self._storage.start()
logger.info("Loading modules")
self.modules.load_all()
if getattr(self.config, 'web', False) or str(self._opts.get('web', False)).upper() == 'TRUE':
self._web = Web(self, EnvFallbackDict('web', getattr(self.config, 'web_opts', {})))
self._web.start()
if getattr(self.config, 'manhole', False):
from gevent.backdoor import BackdoorServer
manhole_opts = EnvFallbackDict('manhole', getattr(self.config, 'manhole_opts', {}))
self._manhole = BackdoorServer((
manhole_opts.get('listen_host', '127.0.0.1'),
int(manhole_opts.get('listen_port', 9001))
), locals={
'client': self.client
})
self._manhole.start()
logger.info("Attempting to log in as %s" % self._opts['email'])
self.client.login(self._opts['email'], self._opts['password'])
logger.info("Starting connection to Discord")
self.client.start()
self._storage_sync_periodic.start(right_away=False)
self._stop_event.clear()
# If we are the main greenlet, chances are we probably want to never return,
# so the main greenlet won't exit, and tear down everything with it.
if auto_join and gevent.get_hub().parent == gevent.getcurrent():
self.join()