def active(self):
"""
Returns the scoped ``Context`` for this execution flow. The ``Context``
uses the ``Greenlet`` class as a carrier, and everytime a greenlet
is created it receives the "parent" context.
"""
current_g = gevent.getcurrent()
ctx = getattr(current_g, CONTEXT_ATTR, None)
if ctx is not None:
# return the active Context for this greenlet (if any)
return ctx
# the Greenlet doesn't have a Context so it's created and attached
# even to the main greenlet. This is required in Distributed Tracing
# when a new arbitrary Context is provided.
if current_g:
ctx = Context()
setattr(current_g, CONTEXT_ATTR, ctx)
return ctx
python类getcurrent()的实例源码
def __init__(self, *args, **kwargs):
# get the current Context if available
current_g = gevent.getcurrent()
ctx = getattr(current_g, CONTEXT_ATTR, None)
# create the Greenlet as usual
super(TracedGreenlet, self).__init__(*args, **kwargs)
# the context is always available made exception of the main greenlet
if ctx:
# create a new context that inherits the current active span
# TODO: a better API for Context, should get the tuple at once
new_ctx = Context(
trace_id=ctx._parent_trace_id,
span_id=ctx._parent_span_id,
sampled=ctx._sampled,
)
new_ctx._current_span = ctx._current_span
setattr(self, CONTEXT_ATTR, new_ctx)
def _record_frame(self, frame):
if self.target_greenlet_id and id(gevent.getcurrent()) != self.target_greenlet_id:
return
now = timeit.default_timer()
if self.last_profile is not None:
if now - self.last_profile < self.interval:
return
self.last_profile = now
self.timestamps.append(int(1e6 * now))
stack = []
while frame is not None:
stack.append(self._format_frame(frame))
frame = frame.f_back
stack.reverse()
self.root.add(stack, self._id_generator)
self.samples.append(self.nextId)
def _native_runner(f):
def cleanup_wrapper(*args, **kwargs):
with db.cleanup_session():
r = f(*args, **kwargs)
return r
parent = weakref.proxy(gevent.getcurrent())
frame = sys._getframe()
def wrapper(*args, **kwargs):
if utils.get_context(None) is None:
g = gevent.getcurrent()
try:
g._hp_inherit(parent, frame)
except AttributeError:
async.Greenlet._hp_inherit(g, parent, frame)
return cleanup_wrapper(*args, **kwargs)
return wrapper
def f(n):
for i in range(n):
print gevent.getcurrent(), i
gevent.sleep(0)
def f(n):
for i in range(n):
print gevent.getcurrent(), i
gevent.sleep(1)
def get_data(url):
#print gevent.getcurrent()
print 'Get url {}'.format(url)
resp = requests.get(url)
print len(resp.text)
def test_main_greenlet(self):
# the main greenlet must not be affected by the tracer
main_greenlet = gevent.getcurrent()
ctx = getattr(main_greenlet, '__datadog_context', None)
ok_(ctx is None)
def test_main_greenlet_context(self):
# the main greenlet must have a ``Context`` if called
ctx_tracer = self.tracer.get_call_context()
main_greenlet = gevent.getcurrent()
ctx_greenlet = getattr(main_greenlet, '__datadog_context', None)
ok_(ctx_tracer is ctx_greenlet)
eq_(len(ctx_tracer._trace), 0)
def activate(self, context):
"""Sets the scoped ``Context`` for the current running ``Greenlet``.
"""
current_g = gevent.getcurrent()
if current_g is not None:
setattr(current_g, CONTEXT_ATTR, context)
return context
def execute(self, event):
"""
Executes a CommandEvent this plugin owns.
"""
if not event.command.oob:
self.greenlets.add(gevent.getcurrent())
try:
return event.command.execute(event)
except CommandError as e:
event.msg.reply(e.msg)
return False
finally:
self.ctx.drop()
def dispatch(self, typ, func, event, *args, **kwargs):
# Link the greenlet with our exception handler
gevent.getcurrent().link_exception(lambda g: self.handle_exception(g, event))
# TODO: this is ugly
if typ != 'command':
self.greenlets.add(gevent.getcurrent())
self.ctx['plugin'] = self
if hasattr(event, 'guild'):
self.ctx['guild'] = event.guild
if hasattr(event, 'channel'):
self.ctx['channel'] = event.channel
if hasattr(event, 'author'):
self.ctx['user'] = event.author
for pre in self._pre[typ]:
event = pre(func, event, args, kwargs)
if event is None:
return False
result = func(event, *args, **kwargs)
for post in self._post[typ]:
post(func, event, args, kwargs, result)
return True
def interval_host(host, time, f, *args, **kwargs):
'''
Creates an Event attached to the *host* for management that will
execute the *f* function every *time* seconds.
See example in :ref:`sample_inter`
:param Proxy host: proxy of the host. Can be obtained from inside a
class with ``self.host``.
:param int time: seconds for the intervals.
:param func f: function to be called every *time* seconds.
:param list args: arguments for *f*.
:return: :class:`Event` instance of the interval.
'''
def wrap(*args, **kwargs):
thread = getcurrent()
args = list(args)
stop_event = args[0]
del args[0]
args = tuple(args)
while not stop_event.is_set():
f(*args, **kwargs)
stop_event.wait(time)
host.detach_interval(thread)
t2_stop = Event()
args = list(args)
args.insert(0, t2_stop)
args = tuple(args)
t = spawn(wrap, *args, **kwargs)
thread_id = t
host.attach_interval(thread_id, t2_stop)
return t2_stop
def invoke(self, func, rpc_id, args, kwargs):
# put the process in the host list pthreads
self.host.pthreads[getcurrent()] = self.__actor.url
try:
result = func(*args, **kwargs)
except Exception, e:
result = e
self.__actor.receive_from_ask(result, rpc_id)
# remove the process from pthreads
del self.host.pthreads[getcurrent()]
def invoke(self, func, args, kwargs):
# put the process in the host list pthreads
self.host.pthreads[getcurrent()] = self.__actor.url
func(*args, **kwargs)
# remove the process from pthreads
del self.host.pthreads[getcurrent()]
def get_host():
if core_type == 'thread':
current = current_thread()
else:
current = getcurrent()
for host in hosts.values():
if current in host.threads.keys():
return host
elif current in host.pthreads.keys():
return host
return main_host
def get_current():
if core_type == 'thread':
current = current_thread()
else:
current = getcurrent()
for host in hosts.values():
if current in host.threads.keys():
return host.actors[host.threads[current]]
elif current in host.pthreads.keys():
return host.actors[host.pthreads[current]]
def greenlet_id(self):
"""Find current greenlet's id
Parts of this class will be called in different greenlets. Greenlet Id
is used to differentiate between greenlet specific instance resources
that can't be shared between greenlets.
"""
return id(gevent.getcurrent())
def hello_from(n):
print('Size of group %s' % len(group1))
print('Hello from Greenlet %s' % id(getcurrent()))
def thread_id(self):
"""Return the current thread identifier."""
return gevent.getcurrent()
def current_component_runner():
"""
Get the active Component Runner
Returns
-------
``rill.engine.runner.ComponentRunner``
"""
import gevent
import rill.engine.runner
greenlet = gevent.getcurrent()
assert isinstance(greenlet, rill.engine.runner.ComponentRunner)
return greenlet
def process(self, msg, kwargs):
import gevent
thread = gevent.getcurrent()
# use explicit component if it was provided
comp = kwargs.pop('component', None)
if comp is not None:
show_thread = comp != thread
else:
comp = thread
show_thread = False
args = kwargs.pop('args', None)
if args:
msg = msg.format(*self._format_args(args))
message, n = self._format(kwargs.pop('port', comp))
# FIXME: get maximum port name length:
pad = max(15 - n, 0)
# can't use .format to left justify because of the color codes
message += ' ' * pad
section = kwargs.pop('section', None)
if section:
message += ' {} :'.format(section)
message += ' {}'.format(msg)
if show_thread:
message += colored(" (on thread {})".format(thread), 'yellow')
return message, kwargs
def step_actor(self, actor):
# Pretend that the current greenlet is the Actor to bypass
# actor_message's asserts.
with mock.patch.object(actor, "greenlet"):
actor.greenlet = gevent.getcurrent()
while actor._event_queue:
actor._step()
def step_actor(self, actor):
# Pretend that the current greenlet is the Actor to bypass
# actor_message's asserts.
with mock.patch.object(actor, "greenlet"):
actor.greenlet = gevent.getcurrent()
while actor._event_queue:
actor._step()
def _get_current():
if not utils.in_cpubound_thread() and constants.server_started:
return gevent.getcurrent()
else:
return threading.local()
def _handle(self, client, address):
"Client handle function"
async.Greenlet._reset_locals(gevent.getcurrent())
log.d("Client connected", str(address))
handler = ClientHandler(client, address)
self._clients.add(handler)
try:
buffer = b''
while True:
data, eof = utils.end_of_message(buffer)
if eof:
buffer = data[1]
log.d("Received", sys.getsizeof(buffer), "bytes from ", address)
if handler.is_active():
client_msg = handler.advance(data[0]) # noqa: F841
else:
log.d("Client has disconnected", address)
break
else:
log.d("Received data, EOF not reached. Waiting for more data from ", address)
utils.switch(constants.Priority.High)
r = client.recv(constants.data_size)
if not r:
log.d("Client has disconnected", address)
break
else:
buffer += r
except socket.error as e:
log.exception("Client disconnected with error", e)
finally:
self._clients.remove(handler)
log.d("Client disconnected", str(address))
def __init__(self, f, *a, **kw):
super().__init__(f, *a, **kw)
self._hp_inherit(self, weakref.proxy(gevent.getcurrent()), sys._getframe())
def get_context(key="ctx"):
"Get a dict local to the spawn tree of current greenlet"
l = getattr(gevent.getcurrent(), 'locals', None)
if key is not None and l:
return l[key]
return l
def get_ident(self):
return id(gevent.getcurrent())
def run(self, auto_join=False):
"""
Runs Dissonance, loading all the modules, starting the web service, and starting the adapter.
If auto_join=True, this function will not return, and will run until dissonance stops if starting dissonance from
outside of a greenlet.
"""
if self.running:
raise RuntimeError("Dissonance is already running!")
logger.info("Starting Dissonance v%s", self.version)
logger.info("Starting storage %s", self._storage)
self._storage.start()
logger.info("Loading modules")
self.modules.load_all()
if getattr(self.config, 'web', False) or str(self._opts.get('web', False)).upper() == 'TRUE':
self._web = Web(self, EnvFallbackDict('web', getattr(self.config, 'web_opts', {})))
self._web.start()
if getattr(self.config, 'manhole', False):
from gevent.backdoor import BackdoorServer
manhole_opts = EnvFallbackDict('manhole', getattr(self.config, 'manhole_opts', {}))
self._manhole = BackdoorServer((
manhole_opts.get('listen_host', '127.0.0.1'),
int(manhole_opts.get('listen_port', 9001))
), locals={
'client': self.client
})
self._manhole.start()
logger.info("Attempting to log in as %s" % self._opts['email'])
self.client.login(self._opts['email'], self._opts['password'])
logger.info("Starting connection to Discord")
self.client.start()
self._storage_sync_periodic.start(right_away=False)
self._stop_event.clear()
# If we are the main greenlet, chances are we probably want to never return,
# so the main greenlet won't exit, and tear down everything with it.
if auto_join and gevent.get_hub().parent == gevent.getcurrent():
self.join()