def restore_profiler():
"""If a typechecking profiler is active, e.g. created by
pytypes.set_global_typechecked_profiler(), such a profiler
must be restored whenever a TypeCheckError is caught.
The call must stem from the thread that raised the error.
Otherwise the typechecking profiler is implicitly disabled.
Alternatively one can turn pytypes into warning mode. In that
mode no calls to this function are required (unless one uses
filterwarnings("error") or likewise).
"""
idn = threading.current_thread().ident
if not sys.getprofile() is None:
warn("restore_profiler: Current profile is not None!")
if not idn in _saved_profilers:
warn("restore_profiler: No saved profiler for calling thread!")
else:
sys.setprofile(_saved_profilers[idn])
del _saved_profilers[idn]
python类setprofile()的实例源码
def start(self):
if self._active:
raise RuntimeError('type checker already running')
elif self._pending:
raise RuntimeError('type checker already starting up')
self._pending = True
# Install this instance as the current profiler
self._previous_profiler = sys.getprofile()
self._set_caller_level_shift(0)
sys.setprofile(self)
# If requested, set this instance as the default profiler for all future threads
# (does not affect existing threads)
if self.all_threads:
self._previous_thread_profiler = threading._profile_hook
threading.setprofile(self)
self._active, self._pending = True, False
def stop(self):
if self._active and not self._pending:
self._pending = True
if sys.getprofile() is self:
sys.setprofile(self._previous_profiler)
if not self._previous_profiler is None and \
isinstance(self._previous_profiler, TypeAgent):
self._previous_profiler._set_caller_level_shift(0)
else:
if not (sys.getprofile() is None and self._cleared):
warn('the system profiling hook has changed unexpectedly')
if self.all_threads:
if threading._profile_hook is self:
threading.setprofile(self._previous_thread_profiler)
else: # pragma: no cover
warn('the threading profiling hook has changed unexpectedly')
self._active, self._pending = False, False
def _trace_cmdline(self, *args, **kwargs):
''' Internal function used to trace the values set to the special variable
cmd_line in the function body.
'''
def tracer(frame, event, arg):
if event=='return':
self._locals = frame.f_locals.copy()
# Activate tracer
sys.setprofile(tracer)
try:
# trace the function call
res = self.func(*args, **kwargs)
finally:
# disable tracer and replace with old one
sys.setprofile(None)
return self._locals['cmd_line']
def __bootstrap(self):
try:
self._set_ident()
self._Thread__started.set()
threading._active_limbo_lock.acquire()
threading._active[self._Thread__ident] = self
del threading._limbo[self]
threading._active_limbo_lock.release()
if threading._trace_hook:
sys.settrace(threading._trace_hook)
if threading._profile_hook:
sys.setprofile(threading._profile_hook)
try:
self.run()
finally:
self._Thread__exc_clear()
finally:
with threading._active_limbo_lock:
self._Thread__stop()
try:
del threading._active[threading._get_ident()]
except:
pass
def start():
"""
Turn on profiling.
"""
global _profile_start, _profile_setup, _call_stack, _inst_data
if _profile_start is not None:
print("profiling is already active.")
return
if not _profile_setup:
setup() # just do a default setup
_profile_start = etime()
_call_stack.append(('$total', _profile_start, None))
if '$total' not in _inst_data:
_inst_data['$total'] = [None, 0., 0]
if sys.getprofile() is not None:
raise RuntimeError("another profile function is already active.")
sys.setprofile(_instance_profile_callback)
def stop():
"""
Turn off profiling.
"""
global _profile_total, _profile_start, _call_stack, _inst_data
if _profile_start is None:
return
sys.setprofile(None)
_call_stack.pop()
_profile_total += (etime() - _profile_start)
_inst_data['$total'][1] = _profile_total
_inst_data['$total'][2] += 1
_profile_start = None
def register_request(request, settings):
snoopy_data = {
'request': request.path,
'method': request.method,
'queries': [],
'profiler_traces': [],
'custom_attributes': {},
'start_time': datetime.datetime.now()
}
_snoopy_request.request = request
_snoopy_request.data = snoopy_data
_snoopy_request.settings = settings
_snoopy_request.current_function_key = [None, None]
from django.conf import settings as django_settings
_snoopy_request.relevant_apps = tuple(django_settings.INSTALLED_APPS)
app_root = get_app_root()
_snoopy_request.app_root = app_root
if _snoopy_request.settings.get('USE_CPROFILE'):
_snoopy_request.profiler = cProfile.Profile()
_snoopy_request.profiler.enable()
if _snoopy_request.settings.get('USE_BUILTIN_PROFILER'):
sys.setprofile(SnoopyRequest.profile)
def register_response(response):
if _snoopy_request.settings.get('USE_BUILTIN_PROFILER'):
sys.setprofile(None)
snoopy_data = _snoopy_request.data
snoopy_data['end_time'] = datetime.datetime.now()
snoopy_data['total_request_time'] = \
(snoopy_data['end_time'] - snoopy_data['start_time'])
if _snoopy_request.settings.get('USE_CPROFILE'):
_snoopy_request.profiler.disable()
profiler_result = StringIO.StringIO()
profiler_stats = pstats.Stats(
_snoopy_request.profiler, stream=profiler_result).sort_stats('cumulative')
profiler_stats.print_stats()
result = profiler_result.getvalue()
if not _snoopy_request.settings.get('CPROFILE_SHOW_ALL_FUNCTIONS'):
result = clean_profiler_result(result)
snoopy_data['profiler_result'] = result
return snoopy_data
def runctx(self, cmd, globals, locals):
self.set_cmd(cmd)
sys.setprofile(self.dispatcher)
try:
exec cmd in globals, locals
finally:
sys.setprofile(None)
return self
# This method is more useful to profile a single function call.
def setprofile(func):
global _profile_hook
_profile_hook = func
def profile_on():
global p_stats, p_start_time
p_stats = {}
p_start_time = time()
threading.setprofile(profiler)
sys.setprofile(profiler)
def profile_off():
threading.setprofile(None)
sys.setprofile(None)
def runctx(self, cmd, globals, locals):
self.set_cmd(cmd)
sys.setprofile(self.dispatcher)
try:
exec cmd in globals, locals
finally:
sys.setprofile(None)
return self
# This method is more useful to profile a single function call.
def setprofile(func):
"""Set a profile function for all threads started from the threading module.
The func will be passed to sys.setprofile() for each thread, before its
run() method is called.
"""
global _profile_hook
_profile_hook = func
def setprofile(func):
"""Set a profile function for all threads started from the threading module.
The func will be passed to sys.setprofile() for each thread, before its
run() method is called.
"""
global _profile_hook
_profile_hook = func
def start_profiler():
global _state
_state = GlobalState()
frame = sys._getframe(0)
current_greenlet = greenlet.getcurrent() # pylint: disable=no-member
thread_state = ensure_thread_state(current_greenlet, frame)
_state.last = thread_state
# this needs to be instantiate before the handler is installed
greenlet.settrace(greenlet_profiler) # pylint: disable=no-member
sys.setprofile(thread_profiler)
threading.setprofile(thread_profiler)
def stop_profiler():
# we keep the _state around for the user until the next session
# Unregister the profiler in this order, otherwise we will have extra
# measurements in the end
sys.setprofile(None)
threading.setprofile(None)
greenlet.settrace(None) # pylint: disable=no-member
def setprofile(func):
"""Set a profile function for all threads started from the threading module.
The func will be passed to sys.setprofile() for each thread, before its
run() method is called.
"""
global _profile_hook
_profile_hook = func
def setprofile(func):
"""Set a profile function for all threads started from the threading module.
The func will be passed to sys.setprofile() for each thread, before its
run() method is called.
"""
global _profile_hook
_profile_hook = func
def trace_calls(
logger: CallTraceLogger,
code_filter: Optional[CodeFilter] = None,
sample_rate: Optional[int] = None,
) -> Iterator[None]:
"""Enable call tracing for a block of code"""
old_trace = sys.getprofile()
sys.setprofile(CallTracer(logger, code_filter, sample_rate))
try:
yield
finally:
sys.setprofile(old_trace)
logger.flush()
def runctx(self, cmd, globals, locals):
self.set_cmd(cmd)
sys.setprofile(self.dispatcher)
try:
exec(cmd, globals, locals)
finally:
sys.setprofile(None)
return self
# This method is more useful to profile a single function call.
def test_getdefaultencoding(self):
self.assertRaises(TypeError, sys.getdefaultencoding, 42)
# can't check more than the type, as the user might have changed it
self.assertIsInstance(sys.getdefaultencoding(), str)
# testing sys.settrace() is done in test_sys_settrace.py
# testing sys.setprofile() is done in test_sys_setprofile.py
def setUp(self):
sys.setprofile(None)
def tearDown(self):
sys.setprofile(None)
def callback(self, frame, event, arg):
# Callback registered with sys.setprofile()/sys.settrace()
self.dispatch[event](self, frame)
def capture_events(callable, p=None):
if p is None:
p = HookWatcher()
# Disable the garbage collector. This prevents __del__s from showing up in
# traces.
old_gc = gc.isenabled()
gc.disable()
try:
sys.setprofile(p.callback)
protect(callable, p)
sys.setprofile(None)
finally:
if old_gc:
gc.enable()
return p.get_events()[1:-1]
def setprofile(func):
global _profile_hook
_profile_hook = func
def runctx(self, cmd, globals, locals):
self.set_cmd(cmd)
sys.setprofile(self.dispatcher)
try:
exec cmd in globals, locals
finally:
sys.setprofile(None)
return self
# This method is more useful to profile a single function call.
def test_getdefaultencoding(self):
if test.test_support.have_unicode:
self.assertRaises(TypeError, sys.getdefaultencoding, 42)
# can't check more than the type, as the user might have changed it
self.assertIsInstance(sys.getdefaultencoding(), str)
# testing sys.settrace() is done in test_sys_settrace.py
# testing sys.setprofile() is done in test_sys_setprofile.py