def recursive_repr(fillvalue='...'):
'Decorator to make a repr function return fillvalue for a recursive call'
def decorating_function(user_function):
repr_running = set()
def wrapper(self):
key = id(self), get_ident()
if key in repr_running:
return fillvalue
repr_running.add(key)
try:
result = user_function(self)
finally:
repr_running.discard(key)
return result
# Can't use functools.wraps() here because of bootstrap issues
wrapper.__module__ = getattr(user_function, '__module__')
wrapper.__doc__ = getattr(user_function, '__doc__')
wrapper.__name__ = getattr(user_function, '__name__')
wrapper.__annotations__ = getattr(user_function, '__annotations__', {})
return wrapper
return decorating_function
python类get_ident()的实例源码
def __init__(self, loop=None, default=None):
greenlet.__init__(self)
if hasattr(loop, 'run'):
if default is not None:
raise TypeError("Unexpected argument: default")
self.loop = loop
else:
if default is None and get_ident() != MAIN_THREAD:
default = False
loop_class = _import(self.loop_class)
if loop is None:
loop = self.backend
self.loop = loop_class(flags=loop, default=default)
self._resolver = None
self._threadpool = None
self.format_context = _import(self.format_context)
def recursive_repr(fillvalue='...'):
'Decorator to make a repr function return fillvalue for a recursive call'
def decorating_function(user_function):
repr_running = set()
def wrapper(self):
key = id(self), get_ident()
if key in repr_running:
return fillvalue
repr_running.add(key)
try:
result = user_function(self)
finally:
repr_running.discard(key)
return result
# Can't use functools.wraps() here because of bootstrap issues
wrapper.__module__ = getattr(user_function, '__module__')
wrapper.__doc__ = getattr(user_function, '__doc__')
wrapper.__name__ = getattr(user_function, '__name__')
wrapper.__qualname__ = getattr(user_function, '__qualname__')
wrapper.__annotations__ = getattr(user_function, '__annotations__', {})
return wrapper
return decorating_function
def get_ident():
"""Dummy implementation of _thread.get_ident().
Since this module should only be used when _threadmodule is not
available, it is safe to assume that the current process is the
only thread. Thus a constant can be safely returned.
"""
return -1
def __init__(self):
object.__setattr__(self, '__storage__', {})
object.__setattr__(self, '__ident_func__', get_ident)
def get_ident(self):
"""Return the context identifier the local objects use internally for
this context. You cannot override this method to change the behavior
but use it to link other context local objects (such as SQLAlchemy's
scoped sessions) to the Werkzeug locals.
.. versionchanged:: 0.7
You can pass a different ident function to the local manager that
will then be propagated to all the locals passed to the
constructor.
"""
return self.ident_func()
def __init__(self):
object.__setattr__(self, '__storage__', {})
object.__setattr__(self, '__ident_func__', get_ident)
def get_ident(self):
"""Return the context identifier the local objects use internally for
this context. You cannot override this method to change the behavior
but use it to link other context local objects (such as SQLAlchemy's
scoped sessions) to the Werkzeug locals.
.. versionchanged:: 0.7
You can pass a different ident function to the local manager that
will then be propagated to all the locals passed to the
constructor.
"""
return self.ident_func()
def __init__(self):
object.__setattr__(self, '__storage__', {})
object.__setattr__(self, '__ident_func__', get_ident)
def get_ident(self):
"""Return the context identifier the local objects use internally for
this context. You cannot override this method to change the behavior
but use it to link other context local objects (such as SQLAlchemy's
scoped sessions) to the Werkzeug locals.
.. versionchanged:: 0.7
You can pass a different ident function to the local manager that
will then be propagated to all the locals passed to the
constructor.
"""
return self.ident_func()
def __enter__(self):
# mark that we're about to do socket I/O so we won't deliver
# debug events when we're debugging the standard library
cur_thread = get_thread_from_id(thread.get_ident())
if cur_thread is not None:
cur_thread.is_sending = True
send_lock.acquire()
def __exit__(self, exc_type, exc_value, tb):
send_lock.release()
# start sending debug events again
cur_thread = get_thread_from_id(thread.get_ident())
if cur_thread is not None:
cur_thread.is_sending = False
if exc_type is not None:
detach_threads()
detach_process()
# swallow the exception, we're no longer debugging
return True
def __init__(self, id = None):
if id is not None:
self.id = id
else:
self.id = thread.get_ident()
self._events = {'call' : self.handle_call,
'line' : self.handle_line,
'return' : self.handle_return,
'exception' : self.handle_exception,
'c_call' : self.handle_c_call,
'c_return' : self.handle_c_return,
'c_exception' : self.handle_c_exception,
}
self.cur_frame = None
self.stepping = STEPPING_NONE
self.unblock_work = None
self._block_lock = thread.allocate_lock()
self._block_lock.acquire()
self._block_starting_lock = thread.allocate_lock()
self._is_blocked = False
self._is_working = False
self.stopped_on_line = None
self.detach = False
self.trace_func = self.trace_func # replace self.trace_func w/ a bound method so we don't need to re-create these regularly
self.prev_trace_func = None
self.trace_func_stack = []
self.reported_process_loaded = False
self.django_stepping = None
self.is_sending = False
# stackless changes
if stackless is not None:
self._stackless_attach()
if sys.platform == 'cli':
self.frames = []
def block(self, block_lambda, keep_stopped_on_line = False):
"""blocks the current thread until the debugger resumes it"""
assert not self._is_blocked
#assert self.id == thread.get_ident(), 'wrong thread identity' + str(self.id) + ' ' + str(thread.get_ident()) # we should only ever block ourselves
# send thread frames before we block
self.enum_thread_frames_locally()
if not keep_stopped_on_line:
self.stopped_on_line = self.cur_frame.f_lineno
# need to synchronize w/ sending the reason we're blocking
self._block_starting_lock.acquire()
self._is_blocked = True
block_lambda()
self._block_starting_lock.release()
while not DETACHED:
self._block_lock.acquire()
if self.unblock_work is None:
break
# the debugger wants us to do something, do it, and then block again
self._is_working = True
self.unblock_work()
self.unblock_work = None
self._is_working = False
self._block_starting_lock.acquire()
assert self._is_blocked
self._is_blocked = False
self._block_starting_lock.release()
def unblock(self):
"""unblocks the current thread allowing it to continue to run"""
assert self._is_blocked
assert self.id != thread.get_ident() # only someone else should unblock us
self._block_lock.release()
def write(self, data):
if not DETACHED:
probe_stack(3)
str_data = utf_8.decode(data)[0]
with _SendLockCtx:
write_bytes(conn, OUTP)
write_int(conn, thread.get_ident())
write_string(conn, str_data)
self.buffer.write(data)
def break_into_debugger():
"""If a PTVS remote debugger is attached, pauses execution of all threads,
and breaks into the debugger with current thread as active.
"""
if not vspd.DETACHED:
vspd.SEND_BREAK_COMPLETE = thread.get_ident()
vspd.mark_all_threads_for_break()
def __enter__(self):
# mark that we're about to do socket I/O so we won't deliver
# debug events when we're debugging the standard library
cur_thread = get_thread_from_id(thread.get_ident())
if cur_thread is not None:
cur_thread.is_sending = True
send_lock.acquire()
def __exit__(self, exc_type, exc_value, tb):
send_lock.release()
# start sending debug events again
cur_thread = get_thread_from_id(thread.get_ident())
if cur_thread is not None:
cur_thread.is_sending = False
if exc_type is not None:
detach_threads()
detach_process()
# swallow the exception, we're no longer debugging
return True
def __init__(self, id = None):
if id is not None:
self.id = id
else:
self.id = thread.get_ident()
self._events = {'call' : self.handle_call,
'line' : self.handle_line,
'return' : self.handle_return,
'exception' : self.handle_exception,
'c_call' : self.handle_c_call,
'c_return' : self.handle_c_return,
'c_exception' : self.handle_c_exception,
}
self.cur_frame = None
self.stepping = STEPPING_NONE
self.unblock_work = None
self._block_lock = thread.allocate_lock()
self._block_lock.acquire()
self._block_starting_lock = thread.allocate_lock()
self._is_blocked = False
self._is_working = False
self.stopped_on_line = None
self.detach = False
self.trace_func = self.trace_func # replace self.trace_func w/ a bound method so we don't need to re-create these regularly
self.prev_trace_func = None
self.trace_func_stack = []
self.reported_process_loaded = False
self.django_stepping = None
self.is_sending = False
# stackless changes
if stackless is not None:
self._stackless_attach()
if sys.platform == 'cli':
self.frames = []