def recursive_repr(fillvalue='...'):
'Decorator to make a repr function return fillvalue for a recursive call'
def decorating_function(user_function):
repr_running = set()
def wrapper(self):
key = id(self), get_ident()
if key in repr_running:
return fillvalue
repr_running.add(key)
try:
result = user_function(self)
finally:
repr_running.discard(key)
return result
# Can't use functools.wraps() here because of bootstrap issues
wrapper.__module__ = getattr(user_function, '__module__')
wrapper.__doc__ = getattr(user_function, '__doc__')
wrapper.__name__ = getattr(user_function, '__name__')
wrapper.__annotations__ = getattr(user_function, '__annotations__', {})
return wrapper
return decorating_function
python类get_ident()的实例源码
singledispatch_helpers.py 文件源码
项目:noc-orchestrator
作者: DirceuSilvaLabs
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
singledispatch_helpers.py 文件源码
项目:noc-orchestrator
作者: DirceuSilvaLabs
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def recursive_repr(fillvalue='...'):
'Decorator to make a repr function return fillvalue for a recursive call'
def decorating_function(user_function):
repr_running = set()
def wrapper(self):
key = id(self), get_ident()
if key in repr_running:
return fillvalue
repr_running.add(key)
try:
result = user_function(self)
finally:
repr_running.discard(key)
return result
# Can't use functools.wraps() here because of bootstrap issues
wrapper.__module__ = getattr(user_function, '__module__')
wrapper.__doc__ = getattr(user_function, '__doc__')
wrapper.__name__ = getattr(user_function, '__name__')
wrapper.__annotations__ = getattr(user_function, '__annotations__', {})
return wrapper
return decorating_function
singledispatch_helpers.py 文件源码
项目:noc-orchestrator
作者: DirceuSilvaLabs
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def recursive_repr(fillvalue='...'):
'Decorator to make a repr function return fillvalue for a recursive call'
def decorating_function(user_function):
repr_running = set()
def wrapper(self):
key = id(self), get_ident()
if key in repr_running:
return fillvalue
repr_running.add(key)
try:
result = user_function(self)
finally:
repr_running.discard(key)
return result
# Can't use functools.wraps() here because of bootstrap issues
wrapper.__module__ = getattr(user_function, '__module__')
wrapper.__doc__ = getattr(user_function, '__doc__')
wrapper.__name__ = getattr(user_function, '__name__')
wrapper.__annotations__ = getattr(user_function, '__annotations__', {})
return wrapper
return decorating_function
def _recursive_repr(fillvalue='...'):
'Decorator to make a repr function return fillvalue for a recursive call'
def decorating_function(user_function):
repr_running = set()
def wrapper(self):
key = id(self), get_ident()
if key in repr_running:
return fillvalue
repr_running.add(key)
try:
result = user_function(self)
finally:
repr_running.discard(key)
return result
# Can't use functools.wraps() here because of bootstrap issues
wrapper.__module__ = getattr(user_function, '__module__')
wrapper.__doc__ = getattr(user_function, '__doc__')
wrapper.__name__ = getattr(user_function, '__name__')
wrapper.__annotations__ = getattr(user_function, '__annotations__', {})
return wrapper
return decorating_function
def remind(self):
while True:
(self.lowest_time,remind_for,chat) = self.find_next()
if self.lowest_time!=-1:
sleep_for = self.lowest_time - time.time()
if (sleep_for<=0):
reply="Should have reminded you for \"`{}`\" on {}, sorry...".format("`\" and \"`".join(self.reminders[chat][str(self.lowest_time)]),time.strftime("%d.%m.%Y - %H:%M:%S",time.localtime(self.lowest_time)))
self.bot.sendMessage(chat,reply,parse_mode="Markdown")
self.reminders[chat].pop(str(self.lowest_time))
self.save_reminders(chat)
else:
time.sleep(sleep_for)
if self.remindthread==thread.get_ident():
reply="Reminder for \"`{}`\". It is now {}.".format("`\" and \"`".join(self.reminders[chat][str(self.lowest_time)]),time.strftime("%d.%m.%Y - %H:%M:%S",time.localtime(self.lowest_time)))
self.bot.sendMessage(chat,reply,parse_mode="Markdown")
self.reminders[chat].pop(str(self.lowest_time))
self.save_reminders(chat)
else:
# other thread took over...
return
else:
#no reminder left
return
def recursive_repr(func):
"""Decorator to prevent infinite repr recursion."""
repr_running = set()
@wraps(func)
def wrapper(self):
key = id(self), get_ident()
if key in repr_running:
return '...'
repr_running.add(key)
try:
return func(self)
finally:
repr_running.discard(key)
return wrapper
def __init__(self, loop=None, default=None):
greenlet.__init__(self)
if hasattr(loop, 'run'):
if default is not None:
raise TypeError("Unexpected argument: default")
self.loop = loop
else:
if default is None and get_ident() != MAIN_THREAD:
default = False
loop_class = _import(self.loop_class)
if loop is None:
loop = self.backend
self.loop = loop_class(flags=loop, default=default)
self._resolver = None
self._threadpool = None
self.format_context = _import(self.format_context)
def __init__(self, loop=None, default=None):
greenlet.__init__(self)
if hasattr(loop, 'run'):
if default is not None:
raise TypeError("Unexpected argument: default")
self.loop = loop
else:
if default is None and get_ident() != MAIN_THREAD:
default = False
loop_class = _import(self.loop_class)
if loop is None:
loop = self.backend
self.loop = loop_class(flags=loop, default=default)
self._resolver = None
self._threadpool = None
self.format_context = _import(self.format_context)
def recursive_repr(func):
"""Decorator to prevent infinite repr recursion."""
repr_running = set()
@wraps(func)
def wrapper(self):
key = id(self), get_ident()
if key in repr_running:
return '...'
repr_running.add(key)
try:
return func(self)
finally:
repr_running.discard(key)
return wrapper
def __init__(self, loop=None, default=None):
greenlet.__init__(self)
if hasattr(loop, 'run'):
if default is not None:
raise TypeError("Unexpected argument: default")
self.loop = loop
else:
if default is None and get_ident() != MAIN_THREAD:
default = False
loop_class = _import(self.loop_class)
if loop is None:
loop = self.backend
self.loop = loop_class(flags=loop, default=default)
self._resolver = None
self._threadpool = None
self.format_context = _import(self.format_context)
def __init__(self, loop=None, default=None):
greenlet.__init__(self)
if hasattr(loop, 'run'):
if default is not None:
raise TypeError("Unexpected argument: default")
self.loop = loop
else:
if default is None and get_ident() != MAIN_THREAD:
default = False
loop_class = _import(self.loop_class)
if loop is None:
loop = self.backend
self.loop = loop_class(flags=loop, default=default)
self._resolver = None
self._threadpool = None
self.format_context = _import(self.format_context)
def __init__(self, loop=None, default=None):
greenlet.__init__(self)
if hasattr(loop, 'run'):
if default is not None:
raise TypeError("Unexpected argument: default")
self.loop = loop
elif _threadlocal.loop is not None:
# Reuse a loop instance previously set by
# destroying a hub without destroying the associated
# loop. See #237 and #238.
self.loop = _threadlocal.loop
else:
if default is None and get_ident() != MAIN_THREAD:
default = False
loop_class = _import(self.loop_class)
if loop is None:
loop = self.backend
self.loop = loop_class(flags=loop, default=default)
self._resolver = None
self._threadpool = None
self.format_context = _import(self.format_context)
def __init__(self, f, n, wait_before_exit=False):
"""
Construct a bunch of `n` threads running the same function `f`.
If `wait_before_exit` is True, the threads won't terminate until
do_finish() is called.
"""
self.f = f
self.n = n
self.started = []
self.finished = []
self._can_exit = not wait_before_exit
def task():
tid = get_ident()
self.started.append(tid)
try:
f()
finally:
self.finished.append(tid)
while not self._can_exit:
_wait()
for i in range(n):
start_new_thread(task, ())
def recursive_repr(fillvalue='...'):
'Decorator to make a repr function return fillvalue for a recursive call'
def decorating_function(user_function):
repr_running = set()
def wrapper(self):
key = id(self), get_ident()
if key in repr_running:
return fillvalue
repr_running.add(key)
try:
result = user_function(self)
finally:
repr_running.discard(key)
return result
# Can't use functools.wraps() here because of bootstrap issues
wrapper.__module__ = getattr(user_function, '__module__')
wrapper.__doc__ = getattr(user_function, '__doc__')
wrapper.__name__ = getattr(user_function, '__name__')
wrapper.__annotations__ = getattr(user_function, '__annotations__', {})
return wrapper
return decorating_function
def recursive_repr(fillvalue='...'):
'Decorator to make a repr function return fillvalue for a recursive call'
def decorating_function(user_function):
repr_running = set()
def wrapper(self):
key = id(self), get_ident()
if key in repr_running:
return fillvalue
repr_running.add(key)
try:
result = user_function(self)
finally:
repr_running.discard(key)
return result
# Can't use functools.wraps() here because of bootstrap issues
wrapper.__module__ = getattr(user_function, '__module__')
wrapper.__doc__ = getattr(user_function, '__doc__')
wrapper.__name__ = getattr(user_function, '__name__')
wrapper.__annotations__ = getattr(user_function, '__annotations__', {})
return wrapper
return decorating_function
singledispatch_helpers.py 文件源码
项目:My-Web-Server-Framework-With-Python2.7
作者: syjsu
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def recursive_repr(fillvalue='...'):
'Decorator to make a repr function return fillvalue for a recursive call'
def decorating_function(user_function):
repr_running = set()
def wrapper(self):
key = id(self), get_ident()
if key in repr_running:
return fillvalue
repr_running.add(key)
try:
result = user_function(self)
finally:
repr_running.discard(key)
return result
# Can't use functools.wraps() here because of bootstrap issues
wrapper.__module__ = getattr(user_function, '__module__')
wrapper.__doc__ = getattr(user_function, '__doc__')
wrapper.__name__ = getattr(user_function, '__name__')
wrapper.__annotations__ = getattr(user_function, '__annotations__', {})
return wrapper
return decorating_function
def release(self):
"""Release a lock, decrementing the recursion level.
If after the decrement it is zero, reset the lock to unlocked (not owned
by any thread), and if any other threads are blocked waiting for the
lock to become unlocked, allow exactly one of them to proceed. If after
the decrement the recursion level is still nonzero, the lock remains
locked and owned by the calling thread.
Only call this method when the calling thread owns the lock. A
RuntimeError is raised if this method is called when the lock is
unlocked.
There is no return value.
"""
if self._owner != get_ident():
raise RuntimeError("cannot release un-acquired lock")
self._count = count = self._count - 1
if not count:
self._owner = None
self._block.release()
def recursive_repr(fillvalue='...'):
'Decorator to make a repr function return fillvalue for a recursive call'
def decorating_function(user_function):
repr_running = set()
def wrapper(self):
key = id(self), get_ident()
if key in repr_running:
return fillvalue
repr_running.add(key)
try:
result = user_function(self)
finally:
repr_running.discard(key)
return result
# Can't use functools.wraps() here because of bootstrap issues
wrapper.__module__ = getattr(user_function, '__module__')
wrapper.__doc__ = getattr(user_function, '__doc__')
wrapper.__name__ = getattr(user_function, '__name__')
wrapper.__annotations__ = getattr(user_function, '__annotations__', {})
return wrapper
return decorating_function
def recursive_repr(func):
"""Decorator to prevent infinite repr recursion."""
repr_running = set()
@wraps(func)
def wrapper(self):
"Return ellipsis on recursive re-entry to function."
key = id(self), get_ident()
if key in repr_running:
return '...'
repr_running.add(key)
try:
return func(self)
finally:
repr_running.discard(key)
return wrapper
def recursive_repr(func):
"""Decorator to prevent infinite repr recursion."""
repr_running = set()
@wraps(func)
def wrapper(self):
"Return ellipsis on recursive re-entry to function."
key = id(self), get_ident()
if key in repr_running:
return '...'
repr_running.add(key)
try:
return func(self)
finally:
repr_running.discard(key)
return wrapper
def recursive_repr(func):
"""Decorator to prevent infinite repr recursion."""
repr_running = set()
@wraps(func)
def wrapper(self):
"Return ellipsis on recursive re-entry to function."
key = id(self), get_ident()
if key in repr_running:
return '...'
repr_running.add(key)
try:
return func(self)
finally:
repr_running.discard(key)
return wrapper
def recursive_repr(func):
"""Decorator to prevent infinite repr recursion."""
repr_running = set()
@wraps(func)
def wrapper(self):
"Return ellipsis on recursive re-entry to function."
key = id(self), get_ident()
if key in repr_running:
return '...'
repr_running.add(key)
try:
return func(self)
finally:
repr_running.discard(key)
return wrapper
chainmap_impl.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def recursive_repr(fillvalue='...'):
'Decorator to make a repr function return fillvalue for a recursive call'
def decorating_function(user_function):
repr_running = set()
def wrapper(self):
key = id(self), get_ident()
if key in repr_running:
return fillvalue
repr_running.add(key)
try:
result = user_function(self)
finally:
repr_running.discard(key)
return result
# Can't use functools.wraps() here because of bootstrap issues
wrapper.__module__ = getattr(user_function, '__module__')
wrapper.__doc__ = getattr(user_function, '__doc__')
wrapper.__name__ = getattr(user_function, '__name__')
return wrapper
return decorating_function
def recursive_repr(func):
"""Decorator to prevent infinite repr recursion."""
repr_running = set()
@wraps(func)
def wrapper(self):
"Return ellipsis on recursive re-entry to function."
key = id(self), get_ident()
if key in repr_running:
return '...'
repr_running.add(key)
try:
return func(self)
finally:
repr_running.discard(key)
return wrapper
def recursive_repr(func):
"""Decorator to prevent infinite repr recursion."""
repr_running = set()
@wraps(func)
def wrapper(self):
"Return ellipsis on recursive re-entry to function."
key = id(self), get_ident()
if key in repr_running:
return '...'
repr_running.add(key)
try:
return func(self)
finally:
repr_running.discard(key)
return wrapper
def recursive_repr(func):
"""Decorator to prevent infinite repr recursion."""
repr_running = set()
@wraps(func)
def wrapper(self):
key = id(self), get_ident()
if key in repr_running:
return '...'
repr_running.add(key)
try:
return func(self)
finally:
repr_running.discard(key)
return wrapper
def __init__(self, loop=None, default=None):
greenlet.__init__(self)
if hasattr(loop, 'run'):
if default is not None:
raise TypeError("Unexpected argument: default")
self.loop = loop
else:
if default is None and get_ident() != MAIN_THREAD:
default = False
loop_class = _import(self.loop_class)
if loop is None:
loop = self.backend
self.loop = loop_class(flags=loop, default=default)
self._resolver = None
self._threadpool = None
self.format_context = _import(self.format_context)
def __init__(self, loop=None, default=None):
greenlet.__init__(self)
if hasattr(loop, 'run'):
if default is not None:
raise TypeError("Unexpected argument: default")
self.loop = loop
else:
if default is None and get_ident() != MAIN_THREAD:
default = False
loop_class = _import(self.loop_class)
if loop is None:
loop = self.backend
self.loop = loop_class(flags=loop, default=default)
self._resolver = None
self._threadpool = None
self.format_context = _import(self.format_context)
def recursive_repr(fillvalue='...'):
'Decorator to make a repr function return fillvalue for a recursive call'
def decorating_function(user_function):
repr_running = set()
def wrapper(self):
key = id(self), get_ident()
if key in repr_running:
return fillvalue
repr_running.add(key)
try:
result = user_function(self)
finally:
repr_running.discard(key)
return result
# Can't use functools.wraps() here because of bootstrap issues
wrapper.__module__ = getattr(user_function, '__module__')
wrapper.__doc__ = getattr(user_function, '__doc__')
wrapper.__name__ = getattr(user_function, '__name__')
wrapper.__annotations__ = getattr(user_function, '__annotations__', {})
return wrapper
return decorating_function
def release(self):
"""Release a lock, decrementing the recursion level.
If after the decrement it is zero, reset the lock to unlocked (not owned
by any thread), and if any other threads are blocked waiting for the
lock to become unlocked, allow exactly one of them to proceed. If after
the decrement the recursion level is still nonzero, the lock remains
locked and owned by the calling thread.
Only call this method when the calling thread owns the lock. A
RuntimeError is raised if this method is called when the lock is
unlocked.
There is no return value.
"""
if self._owner != get_ident():
raise RuntimeError("cannot release un-acquired lock")
self._count = count = self._count - 1
if not count:
self._owner = None
self._block.release()