def myChildThread():
print("Child Thread Starting")
time.sleep(5)
print("Current Thread ----------")
print(threading.current_thread())
print("-------------------------")
print("Main Thread -------------")
print(threading.main_thread())
print("-------------------------")
print("Child Thread Ending")
python类main_thread()的实例源码
def myChildThread():
print("Child Thread Starting")
time.sleep(5)
print("Current Thread ----------")
print(threading.current_thread())
print("-------------------------")
print("Main Thread -------------")
print(threading.main_thread())
print("-------------------------")
print("Child Thread Ending")
def test_call_in_executor(executor):
"""Test that call_in_thread actually runs the target in a worker thread."""
assert not await call_in_executor(lambda: current_thread() is main_thread(),
executor=executor)
def handleOOB(self):
# ALTERNATIVE: Put the message on a Queue for the main thread.
assert threading.current_thread() is threading.main_thread()
for oob in self.torms.inOOB:
self.logger.warning('\t\t!!!!!!!!!!!!!!!!!!!!!!!! %s' % oob)
self.torms.clearOOB()
def _run(name, queue, options):
"""
The actual process that runs the separate controller instance.
:param name: name of the process
:param queue: Queue of the binding parent.
:param options: Custom Options
:type name: str
"""
from pyplanet.core.instance import Controller
from pyplanet.utils.log import initiate_logger, QueueHandler
import logging
# Tokio Asyncio (EXPERIMENTAL).
if 'tokio' in options and options['tokio'] is True:
import tokio
import asyncio
policy = tokio.TokioLoopPolicy()
asyncio.set_event_loop_policy(policy)
asyncio.set_event_loop(tokio.new_event_loop())
logging.warning('Using experimental Tokio Asyncio Loop!')
# Logging to queue.
if multiprocessing.get_start_method() != 'fork': # pragma: no cover
initiate_logger()
root_logger = logging.getLogger()
formatter = ColoredFormatter(
'%(log_color)s%(levelname)-8s%(reset)s %(yellow)s[%(threadName)s][%(name)s]%(reset)s %(blue)s%(message)s'
)
queue_handler = QueueHandler(queue)
queue_handler.setFormatter(formatter)
root_logger.addHandler(queue_handler)
logging.getLogger(__name__).info('Starting pool process for \'{}\'...'.format(name))
# Setting thread name to our process name.
threading.main_thread().setName(name)
# Start instance.
instance = Controller.prepare(name).instance
instance._queue = queue
instance.start()
def cron_task_container(task_dict, add_task_only=False):
"""
??????. ??????, ??????????????
:param task_dict: ?????????, dict
{ "target":????(????????,????????) ??,
"iterval":????(?) ??,
"priority":??? ??,
"name":?????? ??
"args":????? (arg1,arg2) ??,
"kwargs":????? {key:value,} ??,
}
:param add_task_only: ?????????????
"""
global task_scheduler
if not add_task_only:
# ????
try:
infoprint('CronTask:', task_dict.get('name', str(task_dict['target'])), 'Target:', str(task_dict['target']))
target_func = task_dict.get('target')
if target_func is None:
raise ValueError("target is not given in " + str(task_dict))
target_func(
*(task_dict.get('args', ())), # ????????
**(task_dict.get('kwargs', {}))
)
except: # coverage: exclude
errprint('ErrorWhenProcessingCronTasks', task_dict)
traceback.print_exc()
# ????????, ??????
if not enable_cron_tasks:
if threading.current_thread() != threading.main_thread():
exit()
else:
return
# ?????????
task_scheduler.enter(
task_dict.get('interval', 300),
task_dict.get('priority', 999),
cron_task_container,
(task_dict,)
)
def catch_signals(signals):
"""A context manager for catching signals.
Entering this context manager starts listening for the given signals and
returns an async iterator; exiting the context manager stops listening.
The async iterator blocks until at least one signal has arrived, and then
yields a :class:`set` containing all of the signals that were received
since the last iteration.
Note that if you leave the ``with`` block while the iterator has
unextracted signals still pending inside it, then they will be
re-delivered using Python's regular signal handling logic. This avoids a
race condition when signals arrives just before we exit the ``with``
block.
Args:
signals: a set of signals to listen for.
Raises:
RuntimeError: if you try to use this anywhere except Python's main
thread. (This is a Python limitation.)
Example:
A common convention for Unix daemons is that they should reload their
configuration when they receive a ``SIGHUP``. Here's a sketch of what
that might look like using :func:`catch_signals`::
with trio.catch_signals({signal.SIGHUP}) as batched_signal_aiter:
async for batch in batched_signal_aiter:
# We're only listening for one signal, so the batch is always
# {signal.SIGHUP}, but if we were listening to more signals
# then it could vary.
for signum in batch:
assert signum == signal.SIGHUP
reload_configuration()
"""
if threading.current_thread() != threading.main_thread():
raise RuntimeError(
"Sorry, catch_signals is only possible when running in the "
"Python interpreter's main thread"
)
token = _core.current_trio_token()
queue = SignalQueue()
def handler(signum, _):
token.run_sync_soon(queue._add, signum, idempotent=True)
try:
with _signal_handler(signals, handler):
yield queue
finally:
queue._redeliver_remaining()
def _index_subjects(self):
""" quereies the triplestore for all subject uri"""
lg = logging.getLogger("%s.%s" % (self.ln, inspect.stack()[0][3]))
lg.setLevel(self.log_level)
# if the subjects have been indexed and there are no new subjects exit
if self.data_status.get("indexed") and not self.new_subjects:
return
# get a list of all the loc_subject URIs
sparql = """
SELECT ?s
{
?s skos:inScheme <http://id.loc.gov/authorities/subjects> .
}"""
results = run_sparql_query(sparql=sparql)
# Start processing through
self.time_start = datetime.datetime.now()
batch_size = 12000
if len(results) > batch_size:
batch_end = batch_size
else:
batch_end = len(results) - 1
batch_start = 0
batch_num = 1
self.batch_data = {}
self.batch_data[batch_num] = []
end = False
last = False
while not end:
lg.debug("batch %s: %s-%s", batch_num, batch_start, batch_end)
for i, subj in enumerate(results[batch_start:batch_end]):
th = threading.Thread(name=batch_start + i + 1,
target=self._index_subject_item,
args=(iri(subj['s']['value']),
i+1,batch_num,))
th.start()
#self._index_subject_item(iri(subj['s']['value']),i+1)
print(datetime.datetime.now() - self.time_start)
main_thread = threading.main_thread()
for t in threading.enumerate():
if t is main_thread:
continue
#print('joining %s', t.getName())
t.join()
action_list = \
self.es_worker.make_action_list(self.batch_data[batch_num])
self.es_worker.bulk_save(action_list)
del self.batch_data[batch_num]
batch_end += batch_size
batch_start += batch_size
if last:
end = True
if len(results) <= batch_end:
batch_end = len(results)
last = True
batch_num += 1
self.batch_data[batch_num] = []
print(datetime.datetime.now() - self.time_start)
def get_thread_tree(including_this=True):
from .logging import THREAD_LOGGING_CONTEXT
from .bunch import Bunch
tree = {}
dead_threads = set()
contexts = {}
stacks = {}
def add_to_tree(thread):
contexts[thread.ident] = THREAD_LOGGING_CONTEXT.flatten(thread.uuid)
parent = get_thread_parent(thread)
if isinstance(parent, DeadThread) and parent not in dead_threads:
dead_threads.add(parent)
add_to_tree(parent)
tree.setdefault(parent, []).append(thread)
for thread in threading.enumerate():
add_to_tree(thread)
current_ident = threading.current_thread().ident
main_ident = threading.main_thread().ident
for thread_ident, frame in iter_thread_frames():
if not including_this and thread_ident == current_ident:
formatted = " <this frame>"
else:
# show the entire stack if it's this thread, don't skip ('after_module') anything
show_all = thread_ident in (current_ident, main_ident)
formatted = format_thread_stack(frame, skip_modules=[] if show_all else _BOOTSTRAPPERS) if frame else ''
stacks[thread_ident] = formatted, time.time()
def add_thread(parent_thread, parent):
for thread in sorted(tree[parent_thread], key=lambda thread: thread.name):
ident = thread.ident or 0
stack, ts = stacks.get(ident, ("", 0))
context = contexts.get(ident, {})
context_line = ", ".join("%s: %s" % (k, context[k]) for k in "host context".split() if context.get(k))
this = Bunch(
name=thread.name,
daemon="[D]" if getattr(thread, "daemon", False) else "",
ident=ident,
context_line="({})".format(context_line) if context_line else "",
stack=stack,
timestamp=ts,
children=[],
)
parent.children.append(this)
if thread in tree:
add_thread(thread, this)
return parent
return add_thread(None, Bunch(children=[]))