def _decref(token, authkey, state, tls, idset, _Client):
idset.discard(token.id)
# check whether manager is still alive
if state is None or state.value == State.STARTED:
# tell manager this process no longer cares about referent
try:
util.debug('DECREF %r', token.id)
conn = _Client(token.address, authkey=authkey)
dispatch(conn, None, 'decref', (token.id,))
except Exception, e:
util.debug('... decref failed %s', e)
else:
util.debug('DECREF %r -- manager already shutdown', token.id)
# check whether we can close this thread's connection because
# the process owns no more references to objects for this manager
if not idset and hasattr(tls, 'connection'):
util.debug('thread %r has no more proxies so closing conn',
threading.current_thread().name)
tls.connection.close()
del tls.connection
python类current_thread()的实例源码
def __repr__(self):
try:
if self._semlock._is_mine():
name = current_process().name
if threading.current_thread().name != 'MainThread':
name += '|' + threading.current_thread().name
elif self._semlock._get_value() == 1:
name = 'None'
elif self._semlock._count() > 0:
name = 'SomeOtherThread'
else:
name = 'SomeOtherProcess'
except Exception:
name = 'unknown'
return '<Lock(owner=%s)>' % name
#
# Recursive lock
#
def __new__(cls, *args, **kw):
self = object.__new__(cls)
key = '_local__key', 'thread.local.' + str(id(self))
object.__setattr__(self, '_local__key', key)
object.__setattr__(self, '_local__args', (args, kw))
object.__setattr__(self, '_local__lock', RLock())
if (args or kw) and (cls.__init__ is object.__init__):
raise TypeError("Initialization arguments are not supported")
# We need to create the thread dict in anticipation of
# __init__ being called, to make sure we don't call it
# again ourselves.
dict = object.__getattribute__(self, '__dict__')
current_thread().__dict__[key] = dict
return self
def _patch(self):
key = object.__getattribute__(self, '_local__key')
d = current_thread().__dict__.get(key)
if d is None:
d = {}
current_thread().__dict__[key] = d
object.__setattr__(self, '__dict__', d)
# we have a new instance dict, so call out __init__ if we have
# one
cls = type(self)
if cls.__init__ is not object.__init__:
args, kw = object.__getattribute__(self, '_local__args')
cls.__init__(self, *args, **kw)
else:
object.__setattr__(self, '__dict__', d)
def loop_parse_news_flashes(self, coll):
while REDIS_CLIENT.scard(self.key) > 0:
try:
cur_id = REDIS_CLIENT.spop(self.generate_key())
per_page = 20
cur_url = self.news_flashes_tpl % (cur_id.decode("utf-8"), per_page)
print(threading.current_thread().name + ' is crawling ' + cur_url)
resp = self.p_get(cur_url).text
resp_json = json.loads(resp)
news_list = resp_json['data']['items']
for news in news_list:
news.update({'_id': news['id']})
try:
if coll.find_one({'_id': news['id']}) is None:
self.save_doc(coll, news)
except Exception as e:
print(e)
except Exception as e:
print(e)
print(threading.current_thread().name + ' is finish')
def cron_task_host():
"""??????, ?????????, ???????????"""
while True:
# ????????, ??????
if not enable_cron_tasks:
if threading.current_thread() != threading.main_thread():
exit()
else:
return
sleep(60)
try:
task_scheduler.run()
except: # coverage: exclude
errprint('ErrorDuringExecutingCronTasks')
traceback.print_exc()
def init_task_pool(self):
# lazy creation, and set a common pool for all task consumers
pool = self.pool = []
for i in range(self.numjobs):
consumer = Runner.get_pool()
pool.append(consumer)
consumer.idx = i
self.ready = Queue(0)
def setq(consumer):
consumer.ready = self.ready
try:
threading.current_thread().idx = consumer.idx
except Exception as e:
print(e)
for x in pool:
x.ready.put(setq)
return pool
def init_task_pool(self):
# lazy creation, and set a common pool for all task consumers
pool = self.pool = []
for i in range(self.numjobs):
consumer = Runner.get_pool()
pool.append(consumer)
consumer.idx = i
self.ready = Queue(0)
def setq(consumer):
consumer.ready = self.ready
try:
threading.current_thread().idx = consumer.idx
except Exception as e:
print(e)
for x in pool:
x.ready.put(setq)
return pool
def init_task_pool(self):
# lazy creation, and set a common pool for all task consumers
pool = self.pool = []
for i in range(self.numjobs):
consumer = Runner.get_pool()
pool.append(consumer)
consumer.idx = i
self.ready = Queue(0)
def setq(consumer):
consumer.ready = self.ready
try:
threading.current_thread().idx = consumer.idx
except Exception as e:
print(e)
for x in pool:
x.ready.put(setq)
return pool
def init_task_pool(self):
# lazy creation, and set a common pool for all task consumers
pool = self.pool = []
for i in range(self.numjobs):
consumer = Runner.get_pool()
pool.append(consumer)
consumer.idx = i
self.ready = Queue(0)
def setq(consumer):
consumer.ready = self.ready
try:
threading.current_thread().idx = consumer.idx
except Exception as e:
print(e)
for x in pool:
x.ready.put(setq)
return pool
def init_task_pool(self):
# lazy creation, and set a common pool for all task consumers
pool = self.pool = []
for i in range(self.numjobs):
consumer = Runner.get_pool()
pool.append(consumer)
consumer.idx = i
self.ready = Queue(0)
def setq(consumer):
consumer.ready = self.ready
try:
threading.current_thread().idx = consumer.idx
except Exception as e:
print(e)
for x in pool:
x.ready.put(setq)
return pool
def init_task_pool(self):
# lazy creation, and set a common pool for all task consumers
pool = self.pool = []
for i in range(self.numjobs):
consumer = Runner.get_pool()
pool.append(consumer)
consumer.idx = i
self.ready = Queue(0)
def setq(consumer):
consumer.ready = self.ready
try:
threading.current_thread().idx = consumer.idx
except Exception as e:
print(e)
for x in pool:
x.ready.put(setq)
return pool
def init_task_pool(self):
# lazy creation, and set a common pool for all task consumers
pool = self.pool = []
for i in range(self.numjobs):
consumer = Runner.get_pool()
pool.append(consumer)
consumer.idx = i
self.ready = Queue(0)
def setq(consumer):
consumer.ready = self.ready
try:
threading.current_thread().idx = consumer.idx
except Exception as e:
print(e)
for x in pool:
x.ready.put(setq)
return pool
def init_task_pool(self):
# lazy creation, and set a common pool for all task consumers
pool = self.pool = []
for i in range(self.numjobs):
consumer = Runner.get_pool()
pool.append(consumer)
consumer.idx = i
self.ready = Queue(0)
def setq(consumer):
consumer.ready = self.ready
try:
threading.current_thread().idx = consumer.idx
except Exception as e:
print(e)
for x in pool:
x.ready.put(setq)
return pool
def worker_exec(self, queue_timeout=2, **kwargs):
while True:
if self.signal.get('reach_max_num'):
self.logger.info('downloaded image reached max num, thread %s'
' exit', threading.current_thread().name)
break
try:
url = self.in_queue.get(timeout=queue_timeout)
except queue.Empty:
if self.signal.get('feeder_exited'):
self.logger.info('no more page urls to parse, thread %s'
' exit', threading.current_thread().name)
break
else:
self.logger.info('%s is waiting for new page urls',
threading.current_thread().name)
continue
except Exception as e:
self.logger.error('exception caught in thread %s: %s',
threading.current_thread().name, e)
continue
else:
self.logger.debug('start downloading page {}'.format(url))
self.output({'file_url': url})
def wamp_request(e, kwarg, session):
id = threading.current_thread().ident
shared_result[id] = {}
shared_result[id]['result'] = None
def success(d):
shared_result[id]['result'] = d
LOG.debug("DEVICE sent: %s", str(d))
e.set()
return shared_result[id]['result']
def fail(failure):
shared_result[id]['result'] = failure
LOG.error("WAMP FAILURE: %s", str(failure))
e.set()
return shared_result[id]['result']
LOG.debug("Calling %s...", kwarg['wamp_rpc_call'])
d = session.wamp_session.call(wamp_session_caller,
kwarg['wamp_rpc_call'], *kwarg['data'])
d.addCallback(success)
d.addErrback(fail)
# OSLO ENDPOINT
def wait(self):
for x in self.timers:
try:
x.wait()
except eventlet.greenlet.GreenletExit:
pass
except Exception as ex:
LOG.exception(ex)
current = threading.current_thread()
# Iterate over a copy of self.threads so thread_done doesn't
# modify the list while we're iterating
for x in self.threads[:]:
if x is current:
continue
try:
x.wait()
except eventlet.greenlet.GreenletExit:
pass
except Exception as ex:
LOG.exception(ex)
def __cprint(*args, **kwargs):
""" Color print()
Signature like Python 3 print() function
print([object, ...][, sep=' '][, end='\n'][, file=sys.stdout])
"""
if not kwargs.pop("verbose", True):
return
color = kwargs.get('color', None)
sep = kwargs.get('sep', ' ')
end = kwargs.get('end', '\n')
thread = threading.current_thread()
try:
file_ = thread_output_stream.get(thread, ())[-1]
except IndexError:
file_ = kwargs.get('file', sys.stdout)
if color:
printer_queue.put(PrintResource(content='\033[{}m'.format(colors[color]), end='', file=file_, sep=sep, thread=thread))
printer_queue.put(PrintResource(content=args, end='', file=file_, sep=sep, thread=thread)) # TODO printing text that starts from newline
printer_queue.put(PrintResource(content='\033[0m', sep=sep, end=end, file=file_, thread=thread))
else:
printer_queue.put(PrintResource(content=args, sep=sep, end=end, file=file_, thread=thread))
def _decref(token, authkey, state, tls, idset, _Client):
idset.discard(token.id)
# check whether manager is still alive
if state is None or state.value == State.STARTED:
# tell manager this process no longer cares about referent
try:
util.debug('DECREF %r', token.id)
conn = _Client(token.address, authkey=authkey)
dispatch(conn, None, 'decref', (token.id,))
except Exception, e:
util.debug('... decref failed %s', e)
else:
util.debug('DECREF %r -- manager already shutdown', token.id)
# check whether we can close this thread's connection because
# the process owns no more references to objects for this manager
if not idset and hasattr(tls, 'connection'):
util.debug('thread %r has no more proxies so closing conn',
threading.current_thread().name)
tls.connection.close()
del tls.connection
def __repr__(self):
try:
if self._semlock._is_mine():
name = current_process().name
if threading.current_thread().name != 'MainThread':
name += '|' + threading.current_thread().name
count = self._semlock._count()
elif self._semlock._get_value() == 1:
name, count = 'None', 0
elif self._semlock._count() > 0:
name, count = 'SomeOtherThread', 'nonzero'
else:
name, count = 'SomeOtherProcess', 'nonzero'
except Exception:
name, count = 'unknown', 'unknown'
return '<RLock(%s, %s)>' % (name, count)
#
# Condition variable
#
def readerThread(self, d, readerNum):
if sys.version_info[0] < 3 :
name = currentThread().getName()
else :
name = currentThread().name
for i in xrange(5) :
c = d.cursor()
count = 0
rec = c.first()
while rec:
count += 1
key, data = rec
self.assertEqual(self.makeData(key), data)
rec = c.next()
if verbose:
print "%s: found %d records" % (name, count)
c.close()
if verbose:
print "%s: thread finished" % name
def writerThread(self, d, keys, readers):
if sys.version_info[0] < 3 :
name = currentThread().getName()
else :
name = currentThread().name
if verbose:
print "%s: creating records %d - %d" % (name, start, stop)
count=len(keys)//len(readers)
count2=count
for x in keys :
key = '%04d' % x
dbutils.DeadlockWrap(d.put, key, self.makeData(key),
max_retries=12)
if verbose and x % 100 == 0:
print "%s: records %d - %d finished" % (name, start, x)
count2-=1
if not count2 :
readers.pop().start()
count2=count
if verbose:
print "%s: thread finished" % name
def readerThread(self, d, readerNum):
if sys.version_info[0] < 3 :
name = currentThread().getName()
else :
name = currentThread().name
c = d.cursor()
count = 0
rec = dbutils.DeadlockWrap(c.first, max_retries=10)
while rec:
count += 1
key, data = rec
self.assertEqual(self.makeData(key), data)
rec = dbutils.DeadlockWrap(c.next, max_retries=10)
if verbose:
print "%s: found %d records" % (name, count)
c.close()
if verbose:
print "%s: thread finished" % name
def __new__(cls, *args, **kw):
self = object.__new__(cls)
key = '_local__key', 'thread.local.' + str(id(self))
object.__setattr__(self, '_local__key', key)
object.__setattr__(self, '_local__args', (args, kw))
object.__setattr__(self, '_local__lock', RLock())
if (args or kw) and (cls.__init__ is object.__init__):
raise TypeError("Initialization arguments are not supported")
# We need to create the thread dict in anticipation of
# __init__ being called, to make sure we don't call it
# again ourselves.
dict = object.__getattribute__(self, '__dict__')
current_thread().__dict__[key] = dict
return self
def _patch(self):
key = object.__getattribute__(self, '_local__key')
d = current_thread().__dict__.get(key)
if d is None:
d = {}
current_thread().__dict__[key] = d
object.__setattr__(self, '__dict__', d)
# we have a new instance dict, so call out __init__ if we have
# one
cls = type(self)
if cls.__init__ is not object.__init__:
args, kw = object.__getattribute__(self, '_local__args')
cls.__init__(self, *args, **kw)
else:
object.__setattr__(self, '__dict__', d)
def _worker(self):
thread = current_thread()
while True:
task = self._queue.get()
if task is self.StopWorker:
break
# noinspection PyBroadException
try:
task.function(*task.args, **task.kw)
except:
log.exception('Unhandled exception while calling %r in the %r thread' % (task.function, thread.name))
finally:
with self._lock:
self.__dict__['jobs'] -= 1
del task
self._threads.remove(thread)
def _thread_wrapper(self, *args):
''' Wrapper for the worker method defined in the module. Handles calling the actual worker, cleanly exiting upon
interrupt, and passing exceptions back to the main process.'''
thread_name = threading.current_thread().name
self.debug('THREAD => %s started.' % thread_name)
while not self.stopped.is_set():
try:
# use the get_nowait() method for retrieving a queued item to
# prevent the thread from blocking when the queue is empty
obj = self.q.get_nowait()
except Empty:
continue
try:
# launch the public module_thread method
self.module_thread(obj, *args)
except:
# handle exceptions local to the thread
self.print_exception('(thread=%s, object=%s)' % (thread_name, repr(obj)))
finally:
self.q.task_done()
self.debug('THREAD => %s exited.' % thread_name)
# sometimes a keyboardinterrupt causes a race condition between when the self.q.task_done() call above and the
# self.q.empty() call below, causing all the threads to hang. introducing the time.sleep(.7) call below reduces
# the likelihood of encountering the race condition.
def close(self):
"""
Closes our socket connection. This is a pass-through for our socket's
:func:`~stem.socket.ControlSocket.close` method.
"""
self._socket.close()
# Join on any outstanding state change listeners. Closing is a state change
# of its own, so if we have any listeners it's quite likely there's some
# work in progress.
#
# It's important that we do this outside of our locks so those daemons have
# access to us. This is why we're doing this here rather than _close().
for t in self._state_change_threads:
if t.is_alive() and threading.current_thread() != t:
t.join()
def _close(self):
# Our is_alive() state is now false. Our reader thread should already be
# awake from recv() raising a closure exception. Wake up the event thread
# too so it can end.
self._event_notice.set()
self._is_authenticated = False
# joins on our threads if it's safe to do so
for t in (self._reader_thread, self._event_thread):
if t and t.is_alive() and threading.current_thread() != t:
t.join()
self._notify_status_listeners(State.CLOSED)
self._socket_close()
def __cprint(*args, **kwargs):
""" Color print()
Signature like Python 3 print() function
print([object, ...][, sep=' '][, end='\n'][, file=sys.stdout])
"""
if not kwargs.pop("verbose", True):
return
color = kwargs.get('color', None)
sep = kwargs.get('sep', ' ')
end = kwargs.get('end', '\n')
thread = threading.current_thread()
try:
file_ = thread_output_stream.get(thread, ())[-1]
except IndexError:
file_ = kwargs.get('file', sys.stdout)
if color:
printer_queue.put(PrintResource(content='\033[{}m'.format(colors[color]), end='', file=file_, sep=sep, thread=thread))
printer_queue.put(PrintResource(content=args, end='', file=file_, sep=sep, thread=thread)) # TODO printing text that starts from newline
printer_queue.put(PrintResource(content='\033[0m', sep=sep, end=end, file=file_, thread=thread))
else:
printer_queue.put(PrintResource(content=args, sep=sep, end=end, file=file_, thread=thread))