def run(self):
# the code until the while statement does NOT run atomicaly
# a thread while loop cycle is atomic
# thread safe locals: L = threading.local(), then L.foo="baz"
django.setup()
self.logger.info('Worker Starts')
while not self._stopevent.isSet():
if not self.worker_queue.empty():
try:
task = self.worker_queue.get()
self.run_task(task)
except Exception as e:
helpers.save_task_failed(task,e)
else:
helpers.save_task_success(task)
self.worker_queue = None
self.logger.warn('Worker stopped, %s tasks handled'%self.tasks_counter)
python类local()的实例源码
def do_extend(self, additional_time):
pipe = self.redis.pipeline()
pipe.watch(self.name)
lock_value = pipe.get(self.name)
if lock_value != self.local.token:
raise LockError("Cannot extend a lock that's no longer owned")
expiration = pipe.pttl(self.name)
if expiration is None or expiration < 0:
# Redis evicted the lock key between the previous get() and now
# we'll handle this when we call pexpire()
expiration = 0
pipe.multi()
pipe.pexpire(self.name, expiration + int(additional_time * 1000))
try:
response = pipe.execute()
except WatchError:
# someone else acquired the lock
raise LockError("Cannot extend a lock that's no longer owned")
if not response[0]:
# pexpire returns False if the key doesn't exist
raise LockError("Cannot extend a lock that's no longer owned")
return True
def __new__(cls, *args, **kw):
self = object.__new__(cls)
key = '_local__key', 'thread.local.' + str(id(self))
object.__setattr__(self, '_local__key', key)
object.__setattr__(self, '_local__args', (args, kw))
object.__setattr__(self, '_local__lock', RLock())
if (args or kw) and (cls.__init__ is object.__init__):
raise TypeError("Initialization arguments are not supported")
# We need to create the thread dict in anticipation of
# __init__ being called, to make sure we don't call it
# again ourselves.
dict = object.__getattribute__(self, '__dict__')
current_thread().__dict__[key] = dict
return self
def process_request(self, request):
"""
Gets the current user from the request and prepares and connects a signal receiver with the user already
attached to it.
"""
# Initialize thread local storage
threadlocal.actionslog = {
'signal_duid': (self.__class__, time.time()),
'remote_ip': request.META.get('REMOTE_ADDR'),
}
# In case of proxy, set 'original' address
if request.META.get('HTTP_X_FORWARDED_FOR'):
threadlocal.actionslog['remote_ip'] = request.META.get('HTTP_X_FORWARDED_FOR').split(',')[0]
# Connect signal for automatic logging
if hasattr(request, 'user') and hasattr(request.user, 'is_authenticated') and request.user.is_authenticated():
set_user = curry(self.set_user, request.user)
pre_save.connect(set_user, sender=LogAction, dispatch_uid=threadlocal.actionslog['signal_duid'], weak=False)
def do_extend(self, additional_time):
pipe = self.redis.pipeline()
pipe.watch(self.name)
lock_value = pipe.get(self.name)
if lock_value != self.local.token:
raise LockError("Cannot extend a lock that's no longer owned")
expiration = pipe.pttl(self.name)
if expiration is None or expiration < 0:
# Redis evicted the lock key between the previous get() and now
# we'll handle this when we call pexpire()
expiration = 0
pipe.multi()
pipe.pexpire(self.name, expiration + int(additional_time * 1000))
try:
response = pipe.execute()
except WatchError:
# someone else acquired the lock
raise LockError("Cannot extend a lock that's no longer owned")
if not response[0]:
# pexpire returns False if the key doesn't exist
raise LockError("Cannot extend a lock that's no longer owned")
return True
def __exit__(self, type, value, traceback):
try:
self.exit(type, value, traceback)
finally:
final_contexts = _state.contexts
_state.contexts = self.old_contexts
# Generator coroutines and with-statements with non-local
# effects interact badly. Check here for signs of
# the stack getting out of sync.
# Note that this check comes after restoring _state.context
# so that if it fails things are left in a (relatively)
# consistent state.
if final_contexts is not self.new_contexts:
raise StackContextInconsistentError(
'stack_context inconsistency (may be caused by yield '
'within a "with StackContext" block)')
# Break up a reference to itself to allow for faster GC on CPython.
self.new_contexts = None
def do_extend(self, additional_time):
pipe = self.redis.pipeline()
pipe.watch(self.name)
lock_value = pipe.get(self.name)
if lock_value != self.local.token:
raise LockError("Cannot extend a lock that's no longer owned")
expiration = pipe.pttl(self.name)
if expiration is None or expiration < 0:
# Redis evicted the lock key between the previous get() and now
# we'll handle this when we call pexpire()
expiration = 0
pipe.multi()
pipe.pexpire(self.name, expiration + int(additional_time * 1000))
try:
response = pipe.execute()
except WatchError:
# someone else acquired the lock
raise LockError("Cannot extend a lock that's no longer owned")
if not response[0]:
# pexpire returns False if the key doesn't exist
raise LockError("Cannot extend a lock that's no longer owned")
return True
def __exit__(self, type, value, traceback):
try:
self.exit(type, value, traceback)
finally:
final_contexts = _state.contexts
_state.contexts = self.old_contexts
# Generator coroutines and with-statements with non-local
# effects interact badly. Check here for signs of
# the stack getting out of sync.
# Note that this check comes after restoring _state.context
# so that if it fails things are left in a (relatively)
# consistent state.
if final_contexts is not self.new_contexts:
raise StackContextInconsistentError(
'stack_context inconsistency (may be caused by yield '
'within a "with StackContext" block)')
# Break up a reference to itself to allow for faster GC on CPython.
self.new_contexts = None
def __init__(self, parent, overrides=None, threadsafe=False):
self.parent = parent
self._data = {} if overrides is None else overrides
self._old_data = None
# merge self.global_data into self._data
if threadsafe:
for k, v in six.iteritems(self.parent.global_data):
if k not in self._data:
# A deepcopy is necessary to avoid using the same
# objects in globals as we do in thread local storage.
# Otherwise, changing one would automatically affect
# the other.
self._data[k] = copy.deepcopy(v)
else:
for k, v in six.iteritems(self.parent.global_data):
if k not in self._data:
self._data[k] = v
def template_localtime(value, use_tz=None):
"""
Checks if value is a datetime and converts it to local time if necessary.
If use_tz is provided and is not None, that will force the value to
be converted (or not), overriding the value of settings.USE_TZ.
This function is designed for use by the template engine.
"""
should_convert = (isinstance(value, datetime)
and (settings.USE_TZ if use_tz is None else use_tz)
and not is_naive(value)
and getattr(value, 'convert_to_local_time', True))
return localtime(value) if should_convert else value
# Utilities
def localtime(value, timezone=None):
"""
Converts an aware datetime.datetime to local time.
Local time is defined by the current time zone, unless another time zone
is specified.
"""
if timezone is None:
timezone = get_current_timezone()
# If `value` is naive, astimezone() will raise a ValueError,
# so we don't need to perform a redundant check.
value = value.astimezone(timezone)
if hasattr(timezone, 'normalize'):
# This method is available for pytz time zones.
value = timezone.normalize(value)
return value
def __exit__(self, type, value, traceback):
try:
self.exit(type, value, traceback)
finally:
final_contexts = _state.contexts
_state.contexts = self.old_contexts
# Generator coroutines and with-statements with non-local
# effects interact badly. Check here for signs of
# the stack getting out of sync.
# Note that this check comes after restoring _state.context
# so that if it fails things are left in a (relatively)
# consistent state.
if final_contexts is not self.new_contexts:
raise StackContextInconsistentError(
'stack_context inconsistency (may be caused by yield '
'within a "with StackContext" block)')
# Break up a reference to itself to allow for faster GC on CPython.
self.new_contexts = None
def __new__(cls, *args, **kw):
self = object.__new__(cls)
key = '_local__key', 'thread.local.' + str(id(self))
object.__setattr__(self, '_local__key', key)
object.__setattr__(self, '_local__args', (args, kw))
object.__setattr__(self, '_local__lock', RLock())
if (args or kw) and (cls.__init__ is object.__init__):
raise TypeError("Initialization arguments are not supported")
# We need to create the thread dict in anticipation of
# __init__ being called, to make sure we don't call it
# again ourselves.
dict = object.__getattribute__(self, '__dict__')
current_thread().__dict__[key] = dict
return self
def __addcookies(self):
'''Add cookies from self.cookies to request in self.local.h
'''
for cname, morsel in self.cookies.iteritems():
attrs = []
value = morsel.get('version', '')
if value != '' and value != '0':
attrs.append('$Version=%s' % value)
attrs.append('%s=%s' % (cname, morsel.coded_value))
value = morsel.get('path')
if value:
attrs.append('$Path=%s' % value)
value = morsel.get('domain')
if value:
attrs.append('$Domain=%s' % value)
self.local.h.putheader('Cookie', "; ".join(attrs))
def Receive(self, replytype, **kw):
'''Parse message, create Python object.
KeyWord data:
faults -- list of WSDL operation.fault typecodes
wsaction -- If using WS-Address, must specify Action value we expect to
receive.
'''
self.ReceiveSOAP(**kw)
if self.local.ps.IsAFault():
msg = FaultFromFaultMessage(self.local.ps)
raise FaultException(msg)
tc = replytype
if hasattr(replytype, 'typecode'):
tc = replytype.typecode
reply = self.local.ps.Parse(tc)
if self.address is not None:
self.address.checkResponse(self.local.ps, kw.get('wsaction'))
return reply
def __parse_child(self, node):
'''for rpc-style map each message part to a class in typesmodule
'''
try:
tc = self.gettypecode(self.typesmodule, node)
except:
self.logger.debug('didnt find typecode for "%s" in typesmodule: %s',
node.localName, self.typesmodule)
tc = TC.Any(aslist=1)
return tc.parse(node, self.local.ps)
self.logger.debug('parse child with typecode : %s', tc)
try:
return tc.parse(node, self.local.ps)
except Exception:
self.logger.debug('parse failed try Any : %s', tc)
tc = TC.Any(aslist=1)
return tc.parse(node, self.local.ps)
request_context.py 文件源码
项目:opentracing-python-instrumentation
作者: uber-common
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def __init__(self, *args, **kwargs):
class LocalContexts(threading.local):
def __init__(self):
super(LocalContexts, self).__init__()
self._contexts = []
def append(self, item):
self._contexts.append(item)
def pop(self):
return self._contexts.pop()
super(ThreadSafeStackContext, self).__init__(*args, **kwargs)
if hasattr(self, 'contexts'):
# only patch if context exists
self.contexts = LocalContexts()
def load(self):
# Apply extra parameters before loading the configs
self.register_extra_parameters()
globalConfigName = ".dallingerconfig"
globalConfig = os.path.expanduser(os.path.join("~/", globalConfigName))
localConfig = os.path.join(os.getcwd(), LOCAL_CONFIG)
defaults_folder = os.path.join(os.path.dirname(__file__), "default_configs")
local_defaults_file = os.path.join(defaults_folder, "local_config_defaults.txt")
global_defaults_file = os.path.join(defaults_folder, "global_config_defaults.txt")
# Load the configuration, with local parameters overriding global ones.
for config_file in [
global_defaults_file,
local_defaults_file,
globalConfig,
]:
self.load_from_file(config_file)
if os.path.exists(localConfig):
self.load_from_file(localConfig)
self.load_from_environment()
self.ready = True
def __init__(self, context, core_src):
self._context = context
self._present = {'mitogen': [
'mitogen.ansible',
'mitogen.compat',
'mitogen.compat.pkgutil',
'mitogen.fakessh',
'mitogen.master',
'mitogen.ssh',
'mitogen.sudo',
'mitogen.utils',
]}
self.tls = threading.local()
self._cache = {}
if core_src:
self._cache['mitogen.core'] = (
None,
'mitogen/core.py',
zlib.compress(core_src),
)
def __init__(self, maxusage=None, setsession=None,
closeable=False, threadlocal=None, *args, **kwargs):
"""Set up the persistent PostgreSQL connection generator.
maxusage: maximum number of reuses of a single connection
(0 or None means unlimited reuse)
When this maximum usage number of the connection is reached,
the connection is automatically reset (closed and reopened).
setsession: optional list of SQL commands that may serve to prepare
the session, e.g. ["set datestyle to ...", "set time zone ..."]
closeable: if this is set to true, then closing connections will
be allowed, but by default this will be silently ignored
threadlocal: an optional class for representing thread-local data
that will be used instead of our Python implementation
(threading.local is faster, but cannot be used in all cases)
args, kwargs: the parameters that shall be used to establish
the PostgreSQL connections using class PyGreSQL pg.DB()
"""
self._maxusage = maxusage
self._setsession = setsession
self._closeable = closeable
self._args, self._kwargs = args, kwargs
self.thread = (threadlocal or local)()
def __new__(cls, *args, **kw):
self = object.__new__(cls)
key = '_local__key', 'thread.local.' + str(id(self))
object.__setattr__(self, '_local__key', key)
object.__setattr__(self, '_local__args', (args, kw))
object.__setattr__(self, '_local__lock', RLock())
if (args or kw) and (cls.__init__ is object.__init__):
raise TypeError("Initialization arguments are not supported")
# We need to create the thread dict in anticipation of
# __init__ being called, to make sure we don't call it
# again ourselves.
dict = object.__getattribute__(self, '__dict__')
current_thread().__dict__[key] = dict
return self
def wrapping(wrapped):
# A decorator to decorate a decorator's wrapper. Following the lead
# of Twisted and Monocle, this is supposed to make debugging heavily
# decorated code easier. We'll see...
# TODO(pcostello): This copies the functionality of functools.wraps
# following the patch in http://bugs.python.org/issue3445. We can replace
# this once upgrading to python 3.3.
def wrapping_wrapper(wrapper):
try:
wrapper.__wrapped__ = wrapped
wrapper.__name__ = wrapped.__name__
wrapper.__doc__ = wrapped.__doc__
wrapper.__dict__.update(wrapped.__dict__)
# Local functions won't have __module__ attribute.
if hasattr(wrapped, '__module__'):
wrapper.__module__ = wrapped.__module__
except Exception:
pass
return wrapper
return wrapping_wrapper
# Define a base class for classes that need to be thread-local.
# This is pretty subtle; we want to use threading.local if threading
# is supported, but object if it is not.
def __new__(cls, *args, **kw):
self = object.__new__(cls)
key = '_local__key', 'thread.local.' + str(id(self))
object.__setattr__(self, '_local__key', key)
object.__setattr__(self, '_local__args', (args, kw))
object.__setattr__(self, '_local__lock', RLock())
if (args or kw) and (cls.__init__ is object.__init__):
raise TypeError("Initialization arguments are not supported")
# We need to create the thread dict in anticipation of
# __init__ being called, to make sure we don't call it
# again ourselves.
dict = object.__getattribute__(self, '__dict__')
current_thread().__dict__[key] = dict
return self
def wrapping(wrapped):
# A decorator to decorate a decorator's wrapper. Following the lead
# of Twisted and Monocle, this is supposed to make debugging heavily
# decorated code easier. We'll see...
# TODO(pcostello): This copies the functionality of functools.wraps
# following the patch in http://bugs.python.org/issue3445. We can replace
# this once upgrading to python 3.3.
def wrapping_wrapper(wrapper):
try:
wrapper.__wrapped__ = wrapped
wrapper.__name__ = wrapped.__name__
wrapper.__doc__ = wrapped.__doc__
wrapper.__dict__.update(wrapped.__dict__)
# Local functions won't have __module__ attribute.
if hasattr(wrapped, '__module__'):
wrapper.__module__ = wrapped.__module__
except Exception:
pass
return wrapper
return wrapping_wrapper
# Define a base class for classes that need to be thread-local.
# This is pretty subtle; we want to use threading.local if threading
# is supported, but object if it is not.
def set_thread_local(var_name, val):
if val is None and has_thread_local(var_name):
gl_storage = _get_greenlet_local_storage()
# Delete variable from greenlet local storage.
if gl_storage:
del gl_storage[var_name]
# Delete the entire greenlet local storage from thread local storage.
if gl_storage and len(gl_storage) == 0:
del _th_loc_storage.greenlet_locals[corolocal.get_ident()]
if val is not None:
gl_storage = _get_greenlet_local_storage()
if not gl_storage:
gl_storage = _th_loc_storage.greenlet_locals[
corolocal.get_ident()] = {}
gl_storage[var_name] = val
def __init__(self, threads=4, *args, **kwargs):
self.local = threading.local()
super(ThreadBaseScheduler, self).__init__(*args, **kwargs)
if isinstance(self.taskdb, SQLiteMixin):
self.threads = 1
else:
self.threads = threads
self._taskdb = self.taskdb
self._projectdb = self.projectdb
self._resultdb = self.resultdb
self.thread_objs = []
self.thread_queues = []
self._start_threads()
assert len(self.thread_queues) > 0
def __init__(self, username, password):
self.username = username
self.password = password
# Keep state in per-thread local storage
self._thread_local = threading.local()
def __init__(self, username, password):
self.username = username
self.password = password
# Keep state in per-thread local storage
self._thread_local = threading.local()
def __init__(self, username, password):
self.username = username
self.password = password
# Keep state in per-thread local storage
self._thread_local = threading.local()
def acquire(self, blocking=None, blocking_timeout=None):
"""
Use Redis to hold a shared, distributed lock named ``name``.
Returns True once the lock is acquired.
If ``blocking`` is False, always return immediately. If the lock
was acquired, return True, otherwise return False.
``blocking_timeout`` specifies the maximum number of seconds to
wait trying to acquire the lock.
"""
sleep = self.sleep
token = b(uuid.uuid1().hex)
if blocking is None:
blocking = self.blocking
if blocking_timeout is None:
blocking_timeout = self.blocking_timeout
stop_trying_at = None
if blocking_timeout is not None:
stop_trying_at = mod_time.time() + blocking_timeout
while 1:
if self.do_acquire(token):
self.local.token = token
return True
if not blocking:
return False
if stop_trying_at is not None and mod_time.time() > stop_trying_at:
return False
mod_time.sleep(sleep)