python类start_new_thread()的实例源码

test_verify1.py 文件源码 项目:SwiftKitten 作者: johncsnyder 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _run_in_multiple_threads(test1):
    test1()
    import sys
    try:
        import thread
    except ImportError:
        import _thread as thread
    errors = []
    def wrapper(lock):
        try:
            test1()
        except:
            errors.append(sys.exc_info())
        lock.release()
    locks = []
    for i in range(10):
        _lock = thread.allocate_lock()
        _lock.acquire()
        thread.start_new_thread(wrapper, (_lock,))
        locks.append(_lock)
    for _lock in locks:
        _lock.acquire()
        if errors:
            raise errors[0][1]
main.py 文件源码 项目:HanTTS 作者: junzew 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def speak(self, text):
        syllables = lazy_pinyin(text, style=pypinyin.TONE3)
        print(syllables)
        delay = 0

        def preprocess(syllables):
            temp = []
            for syllable in syllables:
                for p in TextToSpeech.punctuation:
                    syllable = syllable.replace(p, "")
                if syllable.isdigit():
                    syllable = atc.num2chinese(syllable)
                    new_sounds = lazy_pinyin(syllable, style=pypinyin.TONE3)
                    for e in new_sounds:
                        temp.append(e)
                else:
                    temp.append(syllable)
            return temp

        syllables = preprocess(syllables)
        for syllable in syllables:
            path = "syllables/"+syllable+".wav"
            _thread.start_new_thread(TextToSpeech._play_audio, (path, delay))
            delay += 0.355
visualstudio_py_debugger.py 文件源码 项目:pythonVSCode 作者: DonJayamanne 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def intercept_threads(for_attach = False):
    thread.start_new_thread = thread_creator
    thread.start_new = thread_creator

    # If threading has already been imported (i.e. we're attaching), we must hot-patch threading._start_new_thread
    # so that new threads started using it will be intercepted by our code.
    #
    # On the other hand, if threading has not been imported, we must not import it ourselves, because it will then
    # treat the current thread as the main thread, which is incorrect when attaching because this code is executing
    # on an ephemeral debugger attach thread that will go away shortly. We don't need to hot-patch it in that case
    # anyway, because it will pick up the new thread.start_new_thread that we have set above when it's imported.
    global _threading
    if _threading is None and 'threading' in sys.modules:
        import threading
        _threading = threading
        _threading._start_new_thread = thread_creator

    global _INTERCEPTING_FOR_ATTACH
    _INTERCEPTING_FOR_ATTACH = for_attach

## Modified parameters by Don Jayamanne
# Accept current Process id to pass back to debugger
visualstudio_py_debugger.py 文件源码 项目:pythonVSCode 作者: DonJayamanne 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def intercept_threads(for_attach = False):
    thread.start_new_thread = thread_creator
    thread.start_new = thread_creator

    # If threading has already been imported (i.e. we're attaching), we must hot-patch threading._start_new_thread
    # so that new threads started using it will be intercepted by our code.
    #
    # On the other hand, if threading has not been imported, we must not import it ourselves, because it will then
    # treat the current thread as the main thread, which is incorrect when attaching because this code is executing
    # on an ephemeral debugger attach thread that will go away shortly. We don't need to hot-patch it in that case
    # anyway, because it will pick up the new thread.start_new_thread that we have set above when it's imported.
    global _threading
    if _threading is None and 'threading' in sys.modules:
        import threading
        _threading = threading
        _threading._start_new_thread = thread_creator

    global _INTERCEPTING_FOR_ATTACH
    _INTERCEPTING_FOR_ATTACH = for_attach

## Modified parameters by Don Jayamanne
# Accept current Process id to pass back to debugger
test_ffi_obj.py 文件源码 项目:SwiftKitten 作者: johncsnyder 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_init_once_multithread():
    if sys.version_info < (3,):
        import thread
    else:
        import _thread as thread
    import time
    #
    def do_init():
        print('init!')
        seen.append('init!')
        time.sleep(1)
        seen.append('init done')
        print('init done')
        return 7
    ffi = _cffi1_backend.FFI()
    seen = []
    for i in range(6):
        def f():
            res = ffi.init_once(do_init, "tag")
            seen.append(res)
        thread.start_new_thread(f, ())
    time.sleep(1.5)
    assert seen == ['init!', 'init done'] + 6 * [7]
test_ffi_obj.py 文件源码 项目:SwiftKitten 作者: johncsnyder 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_init_once_multithread_failure():
    if sys.version_info < (3,):
        import thread
    else:
        import _thread as thread
    import time
    def do_init():
        seen.append('init!')
        time.sleep(1)
        seen.append('oops')
        raise ValueError
    ffi = _cffi1_backend.FFI()
    seen = []
    for i in range(3):
        def f():
            py.test.raises(ValueError, ffi.init_once, do_init, "tag")
        thread.start_new_thread(f, ())
    i = 0
    while len(seen) < 6:
        i += 1
        assert i < 20
        time.sleep(0.51)
    assert seen == ['init!', 'oops'] * 3
backend_tests.py 文件源码 项目:SwiftKitten 作者: johncsnyder 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_init_once_multithread(self):
        import sys, time
        if sys.version_info < (3,):
            import thread
        else:
            import _thread as thread
        #
        def do_init():
            seen.append('init!')
            time.sleep(1)
            seen.append('init done')
            return 7
        ffi = FFI()
        seen = []
        for i in range(6):
            def f():
                res = ffi.init_once(do_init, "tag")
                seen.append(res)
            thread.start_new_thread(f, ())
        time.sleep(1.5)
        assert seen == ['init!', 'init done'] + 6 * [7]
test_verify.py 文件源码 项目:SwiftKitten 作者: johncsnyder 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _run_in_multiple_threads(test1):
    test1()
    import sys
    try:
        import thread
    except ImportError:
        import _thread as thread
    errors = []
    def wrapper(lock):
        try:
            test1()
        except:
            errors.append(sys.exc_info())
        lock.release()
    locks = []
    for i in range(10):
        _lock = thread.allocate_lock()
        _lock.acquire()
        thread.start_new_thread(wrapper, (_lock,))
        locks.append(_lock)
    for _lock in locks:
        _lock.acquire()
        if errors:
            raise errors[0][1]
bday.py 文件源码 项目:Abb1t 作者: k-freeman 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __init__(self, bot):
        self.bot = bot.bot
        self.description = r"*/bday* (insert|remove) _<name>_ _<dd.mm>_ - Add or remove a person to the daily birthday reminder _(removing not implemented yet)_"
        self.queue_in = Queue()
        self.data_dir = 'bdays'
        self.chat_ids = []

        # runs for the first time
        if not os.path.exists(self.data_dir):
            os.makedirs(self.data_dir)
        if not os.path.exists(os.path.join(self.data_dir, "chat_ids.txt")):
            open(os.path.join(self.data_dir, "chat_ids.txt"), 'a').close()

        # load previously contacted chats
        self.load_chat_ids_bday()

        thread.start_new_thread(self.run, ())
        thread.start_new_thread(self.happy_birthday, ())
mydealz.py 文件源码 项目:Abb1t 作者: k-freeman 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self, bot):
        self.bot = bot.bot
        self.description = r"""*/mydealz* - toggle to get freebie notifications, use */mydealztemp* to define a minimum heat of deals"""
        self.queue_in = Queue()
        self.chat_ids = []
        self.sent_already = [time.time()]
        self.freebies = []
        try:
            os.mkdir("mydealz")
        except OSError:
            pass #dir exists.
        self.filename = "./mydealz/mydealz_chat_ids"
        try: 
            with open(self.filename) as f:
                chat_ids=f.read()
                if chat_ids:
                    self.chat_ids=[int(chat_id) for chat_id in chat_ids.split(",")]
        except FileNotFoundError:
            pass
        thread.start_new_thread(self.run, ())
        thread.start_new_thread(self.update, ())
client.py 文件源码 项目:fastweb 作者: BSlience 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def main():
    # create an ioloop, do the above, then stop
    import time
    import _thread
    start = time.time()

    def _thread():
        ioloop.IOLoop.current().run_sync(communicate)

    for _ in range(5):
        _thread.start_new_thread(_thread, ())

    while 1:
        pass

    end = time.time()
    print((end-start))
test_socket.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _setUp(self):
        self.server_ready = threading.Event()
        self.client_ready = threading.Event()
        self.done = threading.Event()
        self.queue = queue.Queue(1)

        # Do some munging to start the client test.
        methodname = self.id()
        i = methodname.rfind('.')
        methodname = methodname[i+1:]
        test_method = getattr(self, '_' + methodname)
        self.client_thread = thread.start_new_thread(
            self.clientRun, (test_method,))

        self.__setUp()
        if not self.server_ready.is_set():
            self.server_ready.set()
        self.client_ready.wait()
test_threading.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_foreign_thread(self):
        # Check that a "foreign" thread can use the threading module.
        def f(mutex):
            # Calling current_thread() forces an entry for the foreign
            # thread to get made in the threading._active map.
            threading.current_thread()
            mutex.release()

        mutex = threading.Lock()
        mutex.acquire()
        tid = _thread.start_new_thread(f, (mutex,))
        # Wait for the thread to finish.
        mutex.acquire()
        self.assertIn(tid, threading._active)
        self.assertIsInstance(threading._active[tid], threading._DummyThread)
        del threading._active[tid]

    # PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently)
    # exposed at the Python level.  This test relies on ctypes to get at it.
test_socket.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _setUp(self):
        self.server_ready = threading.Event()
        self.client_ready = threading.Event()
        self.done = threading.Event()
        self.queue = queue.Queue(1)
        self.server_crashed = False

        # Do some munging to start the client test.
        methodname = self.id()
        i = methodname.rfind('.')
        methodname = methodname[i+1:]
        test_method = getattr(self, '_' + methodname)
        self.client_thread = thread.start_new_thread(
            self.clientRun, (test_method,))

        try:
            self.__setUp()
        except:
            self.server_crashed = True
            raise
        finally:
            self.server_ready.set()
        self.client_ready.wait()
test_threading.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_foreign_thread(self):
        # Check that a "foreign" thread can use the threading module.
        def f(mutex):
            # Calling current_thread() forces an entry for the foreign
            # thread to get made in the threading._active map.
            threading.current_thread()
            mutex.release()

        mutex = threading.Lock()
        mutex.acquire()
        tid = _thread.start_new_thread(f, (mutex,))
        # Wait for the thread to finish.
        mutex.acquire()
        self.assertIn(tid, threading._active)
        self.assertIsInstance(threading._active[tid], threading._DummyThread)
        del threading._active[tid]

    # PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently)
    # exposed at the Python level.  This test relies on ctypes to get at it.
test_threading.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_dummy_thread_after_fork(self):
        # Issue #14308: a dummy thread in the active list doesn't mess up
        # the after-fork mechanism.
        code = """if 1:
            import _thread, threading, os, time

            def background_thread(evt):
                # Creates and registers the _DummyThread instance
                threading.current_thread()
                evt.set()
                time.sleep(10)

            evt = threading.Event()
            _thread.start_new_thread(background_thread, (evt,))
            evt.wait()
            assert threading.active_count() == 2, threading.active_count()
            if os.fork() == 0:
                assert threading.active_count() == 1, threading.active_count()
                os._exit(0)
            else:
                os.wait()
        """
        _, out, err = assert_python_ok("-c", code)
        self.assertEqual(out, b'')
        self.assertEqual(err, b'')
wechat_utils.py 文件源码 项目:Neural-Headline-Generator-CN 作者: QuantumLiu 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def shutdown(self,sec,save=True,filepath='temp.h5'):
        #Function used to shut down the computer
        #sec:waitting time to shut down the computer,sencond
        #save:wether saving the model
        #filepath:the filepath for saving the model
        #????
        #sec:??????
        #save:??????
        #filepath:????????
        if save:
            self.model.save(filepath, overwrite=True)
            self.t_send('Command accepted,the model has already been saved,shutting down the computer....', toUserName='filehelper')
        else:
            self.t_send('Command accepted,shutting down the computer....', toUserName='filehelper')
        if 'Windows' in platform.system():
            th.start_new_thread(system, ('shutdown -s -t %d' %sec,))
        else:
            m=(int(sec/60) if int(sec/60) else 1)
            th.start_new_thread(system, ('shutdown -h -t %d' %m,))

#==============================================================================
#         
#==============================================================================
win32pdhquery.py 文件源码 项目:CodeReader 作者: jasonrbr 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def collectdatawhile(self, period=1):
        '''
        Threaded collection of performance data:
        This method sets up a simple semaphor system for signalling 
        when you would like to start and stop a threaded data collection
        method.  The collection runs every period seconds until the
        semaphor attribute is set to a non-true value (which normally
        should be done by calling query.collectdatawhile_stop() .)
        e.g.:
            query.collectdatawhile(2)
            # starts the query running, returns control to the caller immediately
            # is collecting data every two seconds.
            # do whatever you want to do while the thread runs, then call:
            query.collectdatawhile_stop()
            # when you want to deal with the data.  It is generally a good idea
            # to sleep for period seconds yourself, since the query will not copy
            # the required data until the next iteration:
            time.sleep(2)
            # now you can access the data from the attributes of the query
            query.curresults
            query.curpaths
        '''
        self.collectdatawhile_active = 1
        _thread.start_new_thread(self.collectdatawhile_slave,(period,))
desktopmanager.py 文件源码 项目:CodeReader 作者: jasonrbr 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def create_desktop(desktop_name, start_explorer=1):
    """ Creates a new desktop and spawns a thread running on it
        Will also start a new icon thread on an existing desktop
    """
    sa=pywintypes.SECURITY_ATTRIBUTES()
    sa.bInheritHandle=1

    try:
        hdesk=win32service.CreateDesktop(desktop_name, 0, win32con.MAXIMUM_ALLOWED, sa)
    except win32service.error:
        traceback.print_exc()
        errbuf=io.StringIO()
        traceback.print_exc(None,errbuf)
        win32api.MessageBox(0, errbuf.getvalue(), 'Desktop creation failed')
        return
    if start_explorer:
        s=win32process.STARTUPINFO()
        s.lpDesktop=desktop_name
        prc_info=win32process.CreateProcess(None, "Explorer.exe",None,None,True,win32con.CREATE_NEW_CONSOLE,None,'c:\\',s)

    th=_thread.start_new_thread(new_icon,(hdesk,desktop_name))
    hdesk.SwitchDesktop()
ctp_quote.py 文件源码 项目:hf_at_py 作者: haifengat 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __OnRtnDepthMarketData(
            self, pDepthMarketData=CThostFtdcDepthMarketDataField):
        """"""
        tick = Tick()
        tick.AskPrice = pDepthMarketData.getAskPrice1()
        tick.AskVolume = pDepthMarketData.getAskVolume1()
        tick.AveragePrice = pDepthMarketData.getAveragePrice()
        tick.BidPrice = pDepthMarketData.getBidPrice1()
        tick.BidVolume = pDepthMarketData.getBidVolume1()
        tick.Instrument = pDepthMarketData.getInstrumentID()
        tick.LastPrice = pDepthMarketData.getLastPrice()
        tick.OpenInterest = pDepthMarketData.getOpenInterest()
        tick.Volume = pDepthMarketData.getVolume()

        day = pDepthMarketData.getTradingDay()
        str = day + ' ' + pDepthMarketData.getUpdateTime()
        if day is None or day == ' ':
            str = time.strftime('%Y%m%d %H:%M:%S', time.localtime())
        tick.UpdateTime = str  # time.strptime(str, '%Y%m%d %H:%M:%S')
        self.DicTick[tick.Instrument] = tick
        _thread.start_new_thread(self.OnRtnTick, (tick, ))
        # self.OnRtnTick(tick)
ctp_trade.py 文件源码 项目:hf_at_py 作者: haifengat 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __OnRspUserLogin(self,
                         pRspUserLogin=CThostFtdcRspUserLoginField(),
                         pRspInfo=CThostFtdcRspInfoField,
                         nRequestID=int,
                         bIsLast=bool):

        self.Investor = pRspUserLogin.getUserID()
        self.BrokerID = pRspUserLogin.getBrokerID()

        self.SessionID = pRspUserLogin.getSessionID()
        self.TradingDay = pRspUserLogin.getTradingDay()

        if pRspInfo.getErrorID() != 0:
            info = InfoField()
            info.ErrorID = pRspInfo.getErrorID()
            info.ErrorMsg = pRspInfo.getErrorMsg()
            self.OnRspUserLogin(info)
        else:
            self.t.ReqSettlementInfoConfirm(self.BrokerID, self.Investor)
            if not self.qryStart:
                time.sleep(0.5)
                """???????"""
                _thread.start_new_thread(self.__qry, ())  # ????
ctp_trade.py 文件源码 项目:hf_at_py 作者: haifengat 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __OnErrOrder(self,
                     pInputOrder=CThostFtdcInputOrderField,
                     pRspInfo=CThostFtdcRspInfoField):
        """"""
        id = '{0}|{1}|{2}'.format(self.SessionID, '0',
                                  pInputOrder.getOrderRef())
        of = self.DicOrderField.get(id)

        info = InfoField()
        info.ErrorID = pRspInfo.getErrorID()
        info.ErrorMsg = pRspInfo.getErrorMsg()

        if of and of.IsLocal:
            of.Status = OrderStatus.Error
            of.StatusMsg = '{0}:{1}'.format(pRspInfo.getErrorID(),
                                            pRspInfo.getErrorMsg())
            _thread.start_new_thread(self.OnRtnErrOrder, (of, info))
ctp_socketio.py 文件源码 项目:hf_at_py 作者: haifengat 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def q_OnRspUserLogin(self, info=InfoField):
        """"""
        logger.info('quote' + info.__str__())
        self.io_emit('rsp_login', info.__dict__)

        self.io_emit('rsp_account', self.t.Account.__dict__)

        rtn = []
        for p in self.t.DicInstrument:
            rtn.append(self.t.DicInstrument[p].__dict__)
            self.io_emit('rsp_instrument', rtn)

        rtn = []
        for p in self.t.DicPositionField:
            rtn.append(self.t.DicPositionField[p].__dict__)
            self.io_emit('rsp_position', rtn)

        for p in self.t.DicOrderField:
            self.io_emit('rtn_order', self.t.DicOrderField[p].__dict__)
        for p in self.t.DicTradeField:
            self.io_emit('rtn_trade', self.t.DicTradeField[p].__dict__)
        #???????????
        _thread.start_new_thread(self.OnData, ())
visualstudio_py_debugger.py 文件源码 项目:HomeAutomation 作者: gs2671 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def intercept_threads(for_attach = False):
    thread.start_new_thread = thread_creator
    thread.start_new = thread_creator

    # If threading has already been imported (i.e. we're attaching), we must hot-patch threading._start_new_thread
    # so that new threads started using it will be intercepted by our code.
    #
    # On the other hand, if threading has not been imported, we must not import it ourselves, because it will then
    # treat the current thread as the main thread, which is incorrect when attaching because this code is executing
    # on an ephemeral debugger attach thread that will go away shortly. We don't need to hot-patch it in that case
    # anyway, because it will pick up the new thread.start_new_thread that we have set above when it's imported.
    global _threading
    if _threading is None and 'threading' in sys.modules:
        import threading
        _threading = threading
        _threading._start_new_thread = thread_creator

    global _INTERCEPTING_FOR_ATTACH
    _INTERCEPTING_FOR_ATTACH = for_attach
test_socket.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _setUp(self):
        self.server_ready = threading.Event()
        self.client_ready = threading.Event()
        self.done = threading.Event()
        self.queue = queue.Queue(1)
        self.server_crashed = False

        # Do some munging to start the client test.
        methodname = self.id()
        i = methodname.rfind('.')
        methodname = methodname[i+1:]
        test_method = getattr(self, '_' + methodname)
        self.client_thread = thread.start_new_thread(
            self.clientRun, (test_method,))

        try:
            self.__setUp()
        except:
            self.server_crashed = True
            raise
        finally:
            self.server_ready.set()
        self.client_ready.wait()
test_threading.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_foreign_thread(self):
        # Check that a "foreign" thread can use the threading module.
        def f(mutex):
            # Calling current_thread() forces an entry for the foreign
            # thread to get made in the threading._active map.
            threading.current_thread()
            mutex.release()

        mutex = threading.Lock()
        mutex.acquire()
        tid = _thread.start_new_thread(f, (mutex,))
        # Wait for the thread to finish.
        mutex.acquire()
        self.assertIn(tid, threading._active)
        self.assertIsInstance(threading._active[tid], threading._DummyThread)
        del threading._active[tid]

    # PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently)
    # exposed at the Python level.  This test relies on ctypes to get at it.
test_threading.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_dummy_thread_after_fork(self):
        # Issue #14308: a dummy thread in the active list doesn't mess up
        # the after-fork mechanism.
        code = """if 1:
            import _thread, threading, os, time

            def background_thread(evt):
                # Creates and registers the _DummyThread instance
                threading.current_thread()
                evt.set()
                time.sleep(10)

            evt = threading.Event()
            _thread.start_new_thread(background_thread, (evt,))
            evt.wait()
            assert threading.active_count() == 2, threading.active_count()
            if os.fork() == 0:
                assert threading.active_count() == 1, threading.active_count()
                os._exit(0)
            else:
                os.wait()
        """
        _, out, err = assert_python_ok("-c", code)
        self.assertEqual(out, b'')
        self.assertEqual(err, b'')
__init__.py 文件源码 项目:network-pj-chatroom 作者: KevinWang15 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def run():
    root = tk.Tk()
    client.memory.tk_root = root

    try:
        client.memory.sc = establish_secure_channel_to_server()

    except ConnectionError:
        messagebox.showerror("???", "????????")
        exit(1)

    _thread.start_new_thread(client.util.socket_listener.socket_listener_thread, (client.memory.sc, root))

    login = tk.Toplevel()
    LoginForm(master=login)

    root.withdraw()
    root.mainloop()
    try:
        root.destroy()
    except tk.TclError:
        pass
visualstudio_py_debugger.py 文件源码 项目:xidian-sfweb 作者: Gear420 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def intercept_threads(for_attach = False):
    thread.start_new_thread = thread_creator
    thread.start_new = thread_creator

    # If threading has already been imported (i.e. we're attaching), we must hot-patch threading._start_new_thread
    # so that new threads started using it will be intercepted by our code.
    #
    # On the other hand, if threading has not been imported, we must not import it ourselves, because it will then
    # treat the current thread as the main thread, which is incorrect when attaching because this code is executing
    # on an ephemeral debugger attach thread that will go away shortly. We don't need to hot-patch it in that case
    # anyway, because it will pick up the new thread.start_new_thread that we have set above when it's imported.
    global _threading
    if _threading is None and 'threading' in sys.modules:
        import threading
        _threading = threading
        _threading._start_new_thread = thread_creator

    global _INTERCEPTING_FOR_ATTACH
    _INTERCEPTING_FOR_ATTACH = for_attach
visualstudio_py_debugger.py 文件源码 项目:skojjt 作者: martin-green 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def intercept_threads(for_attach = False):
    thread.start_new_thread = thread_creator
    thread.start_new = thread_creator

    # If threading has already been imported (i.e. we're attaching), we must hot-patch threading._start_new_thread
    # so that new threads started using it will be intercepted by our code.
    #
    # On the other hand, if threading has not been imported, we must not import it ourselves, because it will then
    # treat the current thread as the main thread, which is incorrect when attaching because this code is executing
    # on an ephemeral debugger attach thread that will go away shortly. We don't need to hot-patch it in that case
    # anyway, because it will pick up the new thread.start_new_thread that we have set above when it's imported.
    global _threading
    if _threading is None and 'threading' in sys.modules:
        import threading
        _threading = threading
        _threading._start_new_thread = thread_creator

    global _INTERCEPTING_FOR_ATTACH
    _INTERCEPTING_FOR_ATTACH = for_attach


问题


面经


文章

微信
公众号

扫码关注公众号