python类main_thread()的实例源码

zmirror.py 文件源码 项目:zmirror 作者: aploium 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def cron_task_host():
    """??????, ?????????, ???????????"""
    while True:
        # ????????, ??????
        if not enable_cron_tasks:
            if threading.current_thread() != threading.main_thread():
                exit()
            else:
                return

        sleep(60)
        try:
            task_scheduler.run()
        except:  # coverage: exclude
            errprint('ErrorDuringExecutingCronTasks')
            traceback.print_exc()
_ki.py 文件源码 项目:trio 作者: python-trio 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def ki_manager(deliver_cb, restrict_keyboard_interrupt_to_checkpoints):
    if (threading.current_thread() != threading.main_thread()
            or signal.getsignal(signal.SIGINT) != signal.default_int_handler):
        yield
        return

    def handler(signum, frame):
        assert signum == signal.SIGINT
        protection_enabled = ki_protection_enabled(frame)
        if protection_enabled or restrict_keyboard_interrupt_to_checkpoints:
            deliver_cb()
        else:
            raise KeyboardInterrupt

    signal.signal(signal.SIGINT, handler)
    try:
        yield
    finally:
        if signal.getsignal(signal.SIGINT) is handler:
            signal.signal(signal.SIGINT, signal.default_int_handler)
test_core.py 文件源码 项目:r2-d7 作者: FreakyDug 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_threaded(testbot):
    def threadtest(signal):
        # If a new event loop isn't created for the thread, this will crash
        try:
            assert threading.current_thread() != threading.main_thread()
            testbot.load_data()
        except Exception as error:
            # Pytest will catch this stdout and print it and the signal will
            # fail the test
            print(error)
            signal.clear()
        else:
            signal.set()

    signal = threading.Event()
    thread = threading.Thread(target=threadtest, args=(signal, ))
    thread.start()
    thread.join()
    assert signal.is_set()
test_threading.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_main_thread_after_fork(self):
        code = """if 1:
            import os, threading

            pid = os.fork()
            if pid == 0:
                main = threading.main_thread()
                print(main.name)
                print(main.ident == threading.current_thread().ident)
                print(main.ident == threading.get_ident())
            else:
                os.waitpid(pid, 0)
        """
        _, out, err = assert_python_ok("-c", code)
        data = out.decode().replace('\r', '')
        self.assertEqual(err, b"")
        self.assertEqual(data, "MainThread\nTrue\nTrue\n")
test_threading.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_main_thread_after_fork_from_nonmain_thread(self):
        code = """if 1:
            import os, threading, sys

            def f():
                pid = os.fork()
                if pid == 0:
                    main = threading.main_thread()
                    print(main.name)
                    print(main.ident == threading.current_thread().ident)
                    print(main.ident == threading.get_ident())
                    # stdout is fully buffered because not a tty,
                    # we have to flush before exit.
                    sys.stdout.flush()
                else:
                    os.waitpid(pid, 0)

            th = threading.Thread(target=f)
            th.start()
            th.join()
        """
        _, out, err = assert_python_ok("-c", code)
        data = out.decode().replace('\r', '')
        self.assertEqual(err, b"")
        self.assertEqual(data, "Thread-1\nTrue\nTrue\n")
test_threading.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_3_join_in_forked_from_thread(self):
        # Like the test above, but fork() was called from a worker thread
        # In the forked process, the main Thread object must be marked as stopped.

        script = """if 1:
            main_thread = threading.current_thread()
            def worker():
                childpid = os.fork()
                if childpid != 0:
                    os.waitpid(childpid, 0)
                    sys.exit(0)

                t = threading.Thread(target=joiningfunc,
                                     args=(main_thread,))
                print('end of main')
                t.start()
                t.join() # Should not block: main_thread is already stopped

            w = threading.Thread(target=worker)
            w.start()
            """
        self._run_and_join(script)
concurrency.py 文件源码 项目:easypy 作者: weka-io 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def initialize_exception_listener():  # must be invoked in main thread in "geventless" runs in order for raise_in_main_thread to work
    global REGISTERED_SIGNAL
    if REGISTERED_SIGNAL:
        # already registered
        return

    if threading.current_thread() is not threading.main_thread():
        raise NotMainThread()

    def handle_signal(sig, stack):
        global LAST_ERROR
        error = LAST_ERROR
        LAST_ERROR = None
        if error:
            raise error
        raise LastErrorEmpty(signal=sig)

    custom_signal = signal.SIGUSR1
    if signal.getsignal(custom_signal) in (signal.SIG_DFL, signal.SIG_IGN):  # check if signal is already trapped
        signal.signal(custom_signal, handle_signal)
        REGISTERED_SIGNAL = custom_signal
    else:
        raise SignalAlreadyBound(signal=custom_signal)
concurrency.py 文件源码 项目:easypy 作者: weka-io 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def raise_in_main_thread(exception_type=Exception):

    try:
        yield
    except ProcessExiting:
        # this exception is meant to stay within the thread
        raise
    except exception_type as exc:
        if threading.current_thread() is threading.main_thread():
            raise
        exc._raised_asynchronously = True

        global LAST_ERROR
        if LAST_ERROR:
            _logger.warning("a different error (%s) is pending - skipping", type(LAST_ERROR))
            raise
        LAST_ERROR = exc
        _rimt(exc)
test_threading.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_main_thread_after_fork(self):
        code = """if 1:
            import os, threading

            pid = os.fork()
            if pid == 0:
                main = threading.main_thread()
                print(main.name)
                print(main.ident == threading.current_thread().ident)
                print(main.ident == threading.get_ident())
            else:
                os.waitpid(pid, 0)
        """
        _, out, err = assert_python_ok("-c", code)
        data = out.decode().replace('\r', '')
        self.assertEqual(err, b"")
        self.assertEqual(data, "MainThread\nTrue\nTrue\n")
test_threading.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_main_thread_after_fork_from_nonmain_thread(self):
        code = """if 1:
            import os, threading, sys

            def f():
                pid = os.fork()
                if pid == 0:
                    main = threading.main_thread()
                    print(main.name)
                    print(main.ident == threading.current_thread().ident)
                    print(main.ident == threading.get_ident())
                    # stdout is fully buffered because not a tty,
                    # we have to flush before exit.
                    sys.stdout.flush()
                else:
                    os.waitpid(pid, 0)

            th = threading.Thread(target=f)
            th.start()
            th.join()
        """
        _, out, err = assert_python_ok("-c", code)
        data = out.decode().replace('\r', '')
        self.assertEqual(err, b"")
        self.assertEqual(data, "Thread-1\nTrue\nTrue\n")
test_threading.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_3_join_in_forked_from_thread(self):
        # Like the test above, but fork() was called from a worker thread
        # In the forked process, the main Thread object must be marked as stopped.

        script = """if 1:
            main_thread = threading.current_thread()
            def worker():
                childpid = os.fork()
                if childpid != 0:
                    os.waitpid(childpid, 0)
                    sys.exit(0)

                t = threading.Thread(target=joiningfunc,
                                     args=(main_thread,))
                print('end of main')
                t.start()
                t.join() # Should not block: main_thread is already stopped

            w = threading.Thread(target=worker)
            w.start()
            """
        self._run_and_join(script)
lfs_fuse.py 文件源码 项目:tm-librarian 作者: FabricAttachedMemory 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def destroy(self, path):    # fusermount -u or SIGINT aka control-C
        self.lfs_status = FRDnode.SOC_STATUS_OFFLINE
        self.librarian(self.lcp('update_node_soc_status',
                                status=FRDnode.SOC_STATUS_OFFLINE,
                                cpu_percent=0,
                                rootfs_percent=0,
                                network_in=0,
                                network_out=0,
                                mem_percent=0))
        self.librarian(self.lcp('update_node_mc_status',
                                status=FRDFAModule.MC_STATUS_OFFLINE))
        assert threading.current_thread() is threading.main_thread()
        self.torms.close()
        del self.torms

    # helpers
mainThread.py 文件源码 项目:Learning-Concurrency-in-Python 作者: PacktPublishing 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def myChildThread():
  print("Child Thread Starting")
  time.sleep(5)
  print("Current Thread ----------")
  print(threading.current_thread())
  print("-------------------------")
  print("Main Thread -------------")
  print(threading.main_thread())
  print("-------------------------")
  print("Child Thread Ending")
currentThread.py 文件源码 项目:Learning-Concurrency-in-Python 作者: PacktPublishing 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def myChildThread():
  print("Child Thread Starting")
  time.sleep(5)
  print("Current Thread ----------")
  print(threading.current_thread())
  print("-------------------------")
  print("Main Thread -------------")
  print(threading.main_thread())
  print("-------------------------")
  print("Child Thread Ending")
player.py 文件源码 项目:adblockradio 作者: quasoft 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def on_title_read(self, title):
        assert threading.current_thread() == threading.main_thread()

        if title is None:
            return

        if title != self._last_title:
            self._last_title = title

            # TODO: Fade volume gradually
            # TODO: Allow user to choose what to do when an advertisement block is detected.
            #       Ideas for possible options:
            #       * reduce or mute volume
            #       * play random audio file from a local directory
            #       * switch to another radio station
            #       * repeat part of last song
            print("Title changed to %s" % title)

            # If the title contains a blacklisted tag, reduce volume
            if BlacklistStorage.is_blacklisted(title):
                if not self._in_ad_block:
                    print('Advertisement tag detected.')
                    if config.block_mode in (config.BlockMode.REDUCE_VOLUME, config.BlockMode.REDUCE_AND_SWITCH):
                        print('Reducing volume.')
                        self.volume = config.ad_block_volume
                        self._in_ad_block = True
                        self._last_ad_time = time.time()
                    elif config.block_mode == config.BlockMode.SWITCH_STATION:
                        self.switch_to_another_station()
            else:
                if self._in_ad_block:
                    print('Restoring volume to maximum.')
                    if config.block_mode in (config.BlockMode.REDUCE_VOLUME, config.BlockMode.REDUCE_AND_SWITCH):
                        self.volume = config.max_volume
                    self._in_ad_block = False
                    self._last_ad_time = None
                    self._just_switched = False

            dispatchers.player.song_changed(title)
player.py 文件源码 项目:adblockradio 作者: quasoft 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def fire_state_change(self):
        assert threading.current_thread() == threading.main_thread()
        dispatchers.player.playing_state_changed(self.is_playing)
_io_windows.py 文件源码 项目:trio 作者: python-trio 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def __init__(self):
        # https://msdn.microsoft.com/en-us/library/windows/desktop/aa363862(v=vs.85).aspx
        self._closed = True
        self._iocp = _check(
            kernel32.
            CreateIoCompletionPort(INVALID_HANDLE_VALUE, ffi.NULL, 0, 0)
        )
        self._closed = False
        self._iocp_queue = deque()
        self._iocp_thread = None
        self._overlapped_waiters = {}
        self._completion_key_queues = {}
        # Completion key 0 is reserved for regular IO events
        self._completion_key_counter = itertools.count(1)

        # {stdlib socket object: task}
        # except that wakeup socket is mapped to None
        self._socket_waiters = {"read": {}, "write": {}}
        self._main_thread_waker = WakeupSocketpair()
        wakeup_sock = self._main_thread_waker.wakeup_sock
        self._socket_waiters["read"][wakeup_sock] = None

        # This is necessary to allow control-C to interrupt select().
        # https://github.com/python-trio/trio/issues/42
        if threading.current_thread() == threading.main_thread():
            fileno = self._main_thread_waker.write_sock.fileno()
            self._old_signal_wakeup_fd = signal.set_wakeup_fd(fileno)
_io_windows.py 文件源码 项目:trio 作者: python-trio 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def close(self):
        if not self._closed:
            self._closed = True
            _check(kernel32.CloseHandle(self._iocp))
            if self._iocp_thread is not None:
                self._iocp_thread.join()
            self._main_thread_waker.close()
            if threading.current_thread() == threading.main_thread():
                signal.set_wakeup_fd(self._old_signal_wakeup_fd)
threading.py 文件源码 项目:RealtimePythonChat 作者: quangtqag 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def main_native_thread():
        return __threading__.main_thread() # pylint:disable=no-member
components.py 文件源码 项目:ribosome 作者: tek 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def not_on_main_thread() -> bool:
    return threading.current_thread() != threading.main_thread()
httrader.py 文件源码 项目:OdooQuant 作者: haogefeifei 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def remove_heart_log(*args, **kwargs):
    if six.PY2:
        if threading.current_thread().name == 'MainThread':
            debug_log(*args, **kwargs)
    else:
        if threading.current_thread() == threading.main_thread():
            debug_log(*args, **kwargs)
_signals_windows.py 文件源码 项目:py_daemoniker 作者: Muterra 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _sketch_raise_in_main(exc):
    ''' Sketchy way to raise an exception in the main thread.
    '''
    if isinstance(exc, BaseException):
        exc = type(exc)
    elif issubclass(exc, BaseException):
        pass
    else:
        raise TypeError('Must raise an exception.')

    # Figure out the id of the main thread
    main_id = threading.main_thread().ident
    thread_ref = ctypes.c_long(main_id)
    exc = ctypes.py_object(exc)

    result = ctypes.pythonapi.PyThreadState_SetAsyncExc(
        thread_ref,
        exc
    )

    # 0 Is failed.
    if result == 0:
        raise SystemError('Main thread had invalid ID?')
    # 1 succeeded
    # > 1 failed
    elif result > 1:
        ctypes.pythonapi.PyThreadState_SetAsyncExc(main_id, 0)
        raise SystemError('Failed to raise in main thread.')
_signals_windows.py 文件源码 项目:py_daemoniker 作者: Muterra 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _watch_for_exit(self):
        ''' Automatically watches for termination of the main thread and
        then closes self gracefully.
        '''
        main = threading.main_thread()
        main.join()
        self._stop_nowait()
test_threading.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def test_main_thread(self):
        main = threading.main_thread()
        self.assertEqual(main.name, 'MainThread')
        self.assertEqual(main.ident, threading.current_thread().ident)
        self.assertEqual(main.ident, threading.get_ident())

        def f():
            self.assertNotEqual(threading.main_thread().ident,
                                threading.current_thread().ident)
        th = threading.Thread(target=f)
        th.start()
        th.join()
tools.py 文件源码 项目:gitsome 作者: donnemartin 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def on_main_thread():
    """Checks if we are on the main thread or not."""
    return threading.current_thread() is threading.main_thread()
teepty.py 文件源码 项目:gitsome 作者: donnemartin 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def _on_main_thread():
    """Checks if we are on the main thread or not. Duplicated from xonsh.tools 
    here so that this module only relies on the Python standrd library.
    """
    return threading.current_thread() is threading.main_thread()
threading.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def main_native_thread():
        return __threading__.main_thread() # pylint:disable=no-member
gevent.py 文件源码 项目:easypy 作者: weka-io 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def _patch_module_locks():
    # gevent will not patch existing locks (including ModuleLocks) when it's not single threaded
    # our solution is to monkey patch the release method for ModuleLocks objects
    # we assume that patching is done early enough so no other locks are present

    import importlib
    _old_release = importlib._bootstrap._ModuleLock.release

    def _release(*args, **kw):
        lock = args[0]
        if lock.owner == main_thread_ident_before_patching:
            lock.owner = threading.main_thread().ident
        _old_release(*args, **kw)

    importlib._bootstrap._ModuleLock.release = _release
concurrency.py 文件源码 项目:easypy 作者: weka-io 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _rimt(exc):
        _logger.info('YELLOW<<killing main thread greenlet>>')
        main_thread_greenlet = threading.main_thread()._greenlet
        orig_throw = main_thread_greenlet.throw

        # we must override "throw" method so exception will be raised with the original traceback
        def throw(*args):
            if len(args) == 1:
                ex = args[0]
                return orig_throw(ex.__class__, ex, ex.__traceback__)
            return orig_throw(*args)
        main_thread_greenlet.throw = throw
        gevent.kill(main_thread_greenlet, exc)
        _logger.debug('exiting the thread that failed')
        raise exc
test_threading.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_main_thread(self):
        main = threading.main_thread()
        self.assertEqual(main.name, 'MainThread')
        self.assertEqual(main.ident, threading.current_thread().ident)
        self.assertEqual(main.ident, threading.get_ident())

        def f():
            self.assertNotEqual(threading.main_thread().ident,
                                threading.current_thread().ident)
        th = threading.Thread(target=f)
        th.start()
        th.join()


问题


面经


文章

微信
公众号

扫码关注公众号