def cron_task_host():
"""??????, ?????????, ???????????"""
while True:
# ????????, ??????
if not enable_cron_tasks:
if threading.current_thread() != threading.main_thread():
exit()
else:
return
sleep(60)
try:
task_scheduler.run()
except: # coverage: exclude
errprint('ErrorDuringExecutingCronTasks')
traceback.print_exc()
python类main_thread()的实例源码
def ki_manager(deliver_cb, restrict_keyboard_interrupt_to_checkpoints):
if (threading.current_thread() != threading.main_thread()
or signal.getsignal(signal.SIGINT) != signal.default_int_handler):
yield
return
def handler(signum, frame):
assert signum == signal.SIGINT
protection_enabled = ki_protection_enabled(frame)
if protection_enabled or restrict_keyboard_interrupt_to_checkpoints:
deliver_cb()
else:
raise KeyboardInterrupt
signal.signal(signal.SIGINT, handler)
try:
yield
finally:
if signal.getsignal(signal.SIGINT) is handler:
signal.signal(signal.SIGINT, signal.default_int_handler)
def test_threaded(testbot):
def threadtest(signal):
# If a new event loop isn't created for the thread, this will crash
try:
assert threading.current_thread() != threading.main_thread()
testbot.load_data()
except Exception as error:
# Pytest will catch this stdout and print it and the signal will
# fail the test
print(error)
signal.clear()
else:
signal.set()
signal = threading.Event()
thread = threading.Thread(target=threadtest, args=(signal, ))
thread.start()
thread.join()
assert signal.is_set()
def test_main_thread_after_fork(self):
code = """if 1:
import os, threading
pid = os.fork()
if pid == 0:
main = threading.main_thread()
print(main.name)
print(main.ident == threading.current_thread().ident)
print(main.ident == threading.get_ident())
else:
os.waitpid(pid, 0)
"""
_, out, err = assert_python_ok("-c", code)
data = out.decode().replace('\r', '')
self.assertEqual(err, b"")
self.assertEqual(data, "MainThread\nTrue\nTrue\n")
def test_main_thread_after_fork_from_nonmain_thread(self):
code = """if 1:
import os, threading, sys
def f():
pid = os.fork()
if pid == 0:
main = threading.main_thread()
print(main.name)
print(main.ident == threading.current_thread().ident)
print(main.ident == threading.get_ident())
# stdout is fully buffered because not a tty,
# we have to flush before exit.
sys.stdout.flush()
else:
os.waitpid(pid, 0)
th = threading.Thread(target=f)
th.start()
th.join()
"""
_, out, err = assert_python_ok("-c", code)
data = out.decode().replace('\r', '')
self.assertEqual(err, b"")
self.assertEqual(data, "Thread-1\nTrue\nTrue\n")
def test_3_join_in_forked_from_thread(self):
# Like the test above, but fork() was called from a worker thread
# In the forked process, the main Thread object must be marked as stopped.
script = """if 1:
main_thread = threading.current_thread()
def worker():
childpid = os.fork()
if childpid != 0:
os.waitpid(childpid, 0)
sys.exit(0)
t = threading.Thread(target=joiningfunc,
args=(main_thread,))
print('end of main')
t.start()
t.join() # Should not block: main_thread is already stopped
w = threading.Thread(target=worker)
w.start()
"""
self._run_and_join(script)
def initialize_exception_listener(): # must be invoked in main thread in "geventless" runs in order for raise_in_main_thread to work
global REGISTERED_SIGNAL
if REGISTERED_SIGNAL:
# already registered
return
if threading.current_thread() is not threading.main_thread():
raise NotMainThread()
def handle_signal(sig, stack):
global LAST_ERROR
error = LAST_ERROR
LAST_ERROR = None
if error:
raise error
raise LastErrorEmpty(signal=sig)
custom_signal = signal.SIGUSR1
if signal.getsignal(custom_signal) in (signal.SIG_DFL, signal.SIG_IGN): # check if signal is already trapped
signal.signal(custom_signal, handle_signal)
REGISTERED_SIGNAL = custom_signal
else:
raise SignalAlreadyBound(signal=custom_signal)
def raise_in_main_thread(exception_type=Exception):
try:
yield
except ProcessExiting:
# this exception is meant to stay within the thread
raise
except exception_type as exc:
if threading.current_thread() is threading.main_thread():
raise
exc._raised_asynchronously = True
global LAST_ERROR
if LAST_ERROR:
_logger.warning("a different error (%s) is pending - skipping", type(LAST_ERROR))
raise
LAST_ERROR = exc
_rimt(exc)
def test_main_thread_after_fork(self):
code = """if 1:
import os, threading
pid = os.fork()
if pid == 0:
main = threading.main_thread()
print(main.name)
print(main.ident == threading.current_thread().ident)
print(main.ident == threading.get_ident())
else:
os.waitpid(pid, 0)
"""
_, out, err = assert_python_ok("-c", code)
data = out.decode().replace('\r', '')
self.assertEqual(err, b"")
self.assertEqual(data, "MainThread\nTrue\nTrue\n")
def test_main_thread_after_fork_from_nonmain_thread(self):
code = """if 1:
import os, threading, sys
def f():
pid = os.fork()
if pid == 0:
main = threading.main_thread()
print(main.name)
print(main.ident == threading.current_thread().ident)
print(main.ident == threading.get_ident())
# stdout is fully buffered because not a tty,
# we have to flush before exit.
sys.stdout.flush()
else:
os.waitpid(pid, 0)
th = threading.Thread(target=f)
th.start()
th.join()
"""
_, out, err = assert_python_ok("-c", code)
data = out.decode().replace('\r', '')
self.assertEqual(err, b"")
self.assertEqual(data, "Thread-1\nTrue\nTrue\n")
def test_3_join_in_forked_from_thread(self):
# Like the test above, but fork() was called from a worker thread
# In the forked process, the main Thread object must be marked as stopped.
script = """if 1:
main_thread = threading.current_thread()
def worker():
childpid = os.fork()
if childpid != 0:
os.waitpid(childpid, 0)
sys.exit(0)
t = threading.Thread(target=joiningfunc,
args=(main_thread,))
print('end of main')
t.start()
t.join() # Should not block: main_thread is already stopped
w = threading.Thread(target=worker)
w.start()
"""
self._run_and_join(script)
def destroy(self, path): # fusermount -u or SIGINT aka control-C
self.lfs_status = FRDnode.SOC_STATUS_OFFLINE
self.librarian(self.lcp('update_node_soc_status',
status=FRDnode.SOC_STATUS_OFFLINE,
cpu_percent=0,
rootfs_percent=0,
network_in=0,
network_out=0,
mem_percent=0))
self.librarian(self.lcp('update_node_mc_status',
status=FRDFAModule.MC_STATUS_OFFLINE))
assert threading.current_thread() is threading.main_thread()
self.torms.close()
del self.torms
# helpers
mainThread.py 文件源码
项目:Learning-Concurrency-in-Python
作者: PacktPublishing
项目源码
文件源码
阅读 30
收藏 0
点赞 0
评论 0
def myChildThread():
print("Child Thread Starting")
time.sleep(5)
print("Current Thread ----------")
print(threading.current_thread())
print("-------------------------")
print("Main Thread -------------")
print(threading.main_thread())
print("-------------------------")
print("Child Thread Ending")
currentThread.py 文件源码
项目:Learning-Concurrency-in-Python
作者: PacktPublishing
项目源码
文件源码
阅读 28
收藏 0
点赞 0
评论 0
def myChildThread():
print("Child Thread Starting")
time.sleep(5)
print("Current Thread ----------")
print(threading.current_thread())
print("-------------------------")
print("Main Thread -------------")
print(threading.main_thread())
print("-------------------------")
print("Child Thread Ending")
def on_title_read(self, title):
assert threading.current_thread() == threading.main_thread()
if title is None:
return
if title != self._last_title:
self._last_title = title
# TODO: Fade volume gradually
# TODO: Allow user to choose what to do when an advertisement block is detected.
# Ideas for possible options:
# * reduce or mute volume
# * play random audio file from a local directory
# * switch to another radio station
# * repeat part of last song
print("Title changed to %s" % title)
# If the title contains a blacklisted tag, reduce volume
if BlacklistStorage.is_blacklisted(title):
if not self._in_ad_block:
print('Advertisement tag detected.')
if config.block_mode in (config.BlockMode.REDUCE_VOLUME, config.BlockMode.REDUCE_AND_SWITCH):
print('Reducing volume.')
self.volume = config.ad_block_volume
self._in_ad_block = True
self._last_ad_time = time.time()
elif config.block_mode == config.BlockMode.SWITCH_STATION:
self.switch_to_another_station()
else:
if self._in_ad_block:
print('Restoring volume to maximum.')
if config.block_mode in (config.BlockMode.REDUCE_VOLUME, config.BlockMode.REDUCE_AND_SWITCH):
self.volume = config.max_volume
self._in_ad_block = False
self._last_ad_time = None
self._just_switched = False
dispatchers.player.song_changed(title)
def fire_state_change(self):
assert threading.current_thread() == threading.main_thread()
dispatchers.player.playing_state_changed(self.is_playing)
def __init__(self):
# https://msdn.microsoft.com/en-us/library/windows/desktop/aa363862(v=vs.85).aspx
self._closed = True
self._iocp = _check(
kernel32.
CreateIoCompletionPort(INVALID_HANDLE_VALUE, ffi.NULL, 0, 0)
)
self._closed = False
self._iocp_queue = deque()
self._iocp_thread = None
self._overlapped_waiters = {}
self._completion_key_queues = {}
# Completion key 0 is reserved for regular IO events
self._completion_key_counter = itertools.count(1)
# {stdlib socket object: task}
# except that wakeup socket is mapped to None
self._socket_waiters = {"read": {}, "write": {}}
self._main_thread_waker = WakeupSocketpair()
wakeup_sock = self._main_thread_waker.wakeup_sock
self._socket_waiters["read"][wakeup_sock] = None
# This is necessary to allow control-C to interrupt select().
# https://github.com/python-trio/trio/issues/42
if threading.current_thread() == threading.main_thread():
fileno = self._main_thread_waker.write_sock.fileno()
self._old_signal_wakeup_fd = signal.set_wakeup_fd(fileno)
def close(self):
if not self._closed:
self._closed = True
_check(kernel32.CloseHandle(self._iocp))
if self._iocp_thread is not None:
self._iocp_thread.join()
self._main_thread_waker.close()
if threading.current_thread() == threading.main_thread():
signal.set_wakeup_fd(self._old_signal_wakeup_fd)
def main_native_thread():
return __threading__.main_thread() # pylint:disable=no-member
def not_on_main_thread() -> bool:
return threading.current_thread() != threading.main_thread()
def remove_heart_log(*args, **kwargs):
if six.PY2:
if threading.current_thread().name == 'MainThread':
debug_log(*args, **kwargs)
else:
if threading.current_thread() == threading.main_thread():
debug_log(*args, **kwargs)
def _sketch_raise_in_main(exc):
''' Sketchy way to raise an exception in the main thread.
'''
if isinstance(exc, BaseException):
exc = type(exc)
elif issubclass(exc, BaseException):
pass
else:
raise TypeError('Must raise an exception.')
# Figure out the id of the main thread
main_id = threading.main_thread().ident
thread_ref = ctypes.c_long(main_id)
exc = ctypes.py_object(exc)
result = ctypes.pythonapi.PyThreadState_SetAsyncExc(
thread_ref,
exc
)
# 0 Is failed.
if result == 0:
raise SystemError('Main thread had invalid ID?')
# 1 succeeded
# > 1 failed
elif result > 1:
ctypes.pythonapi.PyThreadState_SetAsyncExc(main_id, 0)
raise SystemError('Failed to raise in main thread.')
def _watch_for_exit(self):
''' Automatically watches for termination of the main thread and
then closes self gracefully.
'''
main = threading.main_thread()
main.join()
self._stop_nowait()
def test_main_thread(self):
main = threading.main_thread()
self.assertEqual(main.name, 'MainThread')
self.assertEqual(main.ident, threading.current_thread().ident)
self.assertEqual(main.ident, threading.get_ident())
def f():
self.assertNotEqual(threading.main_thread().ident,
threading.current_thread().ident)
th = threading.Thread(target=f)
th.start()
th.join()
def on_main_thread():
"""Checks if we are on the main thread or not."""
return threading.current_thread() is threading.main_thread()
def _on_main_thread():
"""Checks if we are on the main thread or not. Duplicated from xonsh.tools
here so that this module only relies on the Python standrd library.
"""
return threading.current_thread() is threading.main_thread()
def main_native_thread():
return __threading__.main_thread() # pylint:disable=no-member
def _patch_module_locks():
# gevent will not patch existing locks (including ModuleLocks) when it's not single threaded
# our solution is to monkey patch the release method for ModuleLocks objects
# we assume that patching is done early enough so no other locks are present
import importlib
_old_release = importlib._bootstrap._ModuleLock.release
def _release(*args, **kw):
lock = args[0]
if lock.owner == main_thread_ident_before_patching:
lock.owner = threading.main_thread().ident
_old_release(*args, **kw)
importlib._bootstrap._ModuleLock.release = _release
def _rimt(exc):
_logger.info('YELLOW<<killing main thread greenlet>>')
main_thread_greenlet = threading.main_thread()._greenlet
orig_throw = main_thread_greenlet.throw
# we must override "throw" method so exception will be raised with the original traceback
def throw(*args):
if len(args) == 1:
ex = args[0]
return orig_throw(ex.__class__, ex, ex.__traceback__)
return orig_throw(*args)
main_thread_greenlet.throw = throw
gevent.kill(main_thread_greenlet, exc)
_logger.debug('exiting the thread that failed')
raise exc
def test_main_thread(self):
main = threading.main_thread()
self.assertEqual(main.name, 'MainThread')
self.assertEqual(main.ident, threading.current_thread().ident)
self.assertEqual(main.ident, threading.get_ident())
def f():
self.assertNotEqual(threading.main_thread().ident,
threading.current_thread().ident)
th = threading.Thread(target=f)
th.start()
th.join()