python类start_new_thread()的实例源码

qiubaiadult.py 文件源码 项目:Python 作者: Guzi219 项目源码 文件源码 阅读 42 收藏 0 点赞 0 评论 0
def Start(self):
        self.enable = True
        page = self.page
        print u'????????......'
        # ????????????????
        thread.start_new_thread(self.LoadPage, ())
        time.sleep(2) #wait the sub thread to be done.
        # ----------- ???????? -----------
        while self.enable:
            # ??self?page???????
            if len(self.pages) > 0:
                now_page_items = self.pages[0]

                # del now page items
                del self.pages[0]
        print '---main thred --', page
                self.ShowOnePage(now_page_items, page)
                page += 1

        print self.enable


# ----------- ?????? -----------
test_verify1.py 文件源码 项目:SwiftKitten 作者: johncsnyder 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _run_in_multiple_threads(test1):
    test1()
    import sys
    try:
        import thread
    except ImportError:
        import _thread as thread
    errors = []
    def wrapper(lock):
        try:
            test1()
        except:
            errors.append(sys.exc_info())
        lock.release()
    locks = []
    for i in range(10):
        _lock = thread.allocate_lock()
        _lock.acquire()
        thread.start_new_thread(wrapper, (_lock,))
        locks.append(_lock)
    for _lock in locks:
        _lock.acquire()
        if errors:
            raise errors[0][1]
qiushimm.py 文件源码 项目:Python 作者: Guzi219 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def Start(self):
        self.enable = True
        page = self.page
        print u'????????......'
        # ????????????????
        thread.start_new_thread(self.LoadPage, ())
        time.sleep(2) #wait the sub thread to be done.
        # ----------- ???????? -----------
        while self.enable:
            # ??self?page???????
            if len(self.pages) > 0:
                now_page_items = self.pages[0]

                # del now page items
                del self.pages[0]

                self.ShowOnePage(now_page_items, page)
                page += 1

        print self.enable


# ----------- ?????? -----------
recipe-502204.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __proxy(self):
        'Private class method.'
        proxy = _socket.socket(self.FAMILY, self.TYPE)
        proxy.bind(self.__bind)
        proxy.listen(5)
        while True:
            client = proxy.accept()[0]
            self.__lock.acquire()
            if not self.__status:
                proxy.close()
                self.__thread = False
                self.__lock.release()
                break
            self.__lock.release()
            server = _socket.socket(self.FAMILY, self.TYPE)
            server.connect(self.__connect)
            _thread.start_new_thread(self.__serve, (client, server))
recipe-496746.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def exec_timed(code, context, timeout_secs):
    """
    Dynamically execute 'code' using 'context' as the global enviroment.
    SafeEvalTimeoutException is raised if execution does not finish within
    the given timelimit.
    """
    assert(timeout_secs > 0)

    signal_finished = False

    def alarm(secs):
        def wait(secs):
            for n in xrange(timeout_secs):
                time.sleep(1)
                if signal_finished: break
            else:
                thread.interrupt_main()
        thread.start_new_thread(wait, (secs,))

    try:
        alarm(timeout_secs)
        exec code in context
        signal_finished = True
    except KeyboardInterrupt:
        raise SafeEvalTimeoutException(timeout_secs)
visualstudio_py_debugger.py 文件源码 项目:pythonVSCode 作者: DonJayamanne 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def intercept_threads(for_attach = False):
    thread.start_new_thread = thread_creator
    thread.start_new = thread_creator

    # If threading has already been imported (i.e. we're attaching), we must hot-patch threading._start_new_thread
    # so that new threads started using it will be intercepted by our code.
    #
    # On the other hand, if threading has not been imported, we must not import it ourselves, because it will then
    # treat the current thread as the main thread, which is incorrect when attaching because this code is executing
    # on an ephemeral debugger attach thread that will go away shortly. We don't need to hot-patch it in that case
    # anyway, because it will pick up the new thread.start_new_thread that we have set above when it's imported.
    global _threading
    if _threading is None and 'threading' in sys.modules:
        import threading
        _threading = threading
        _threading._start_new_thread = thread_creator

    global _INTERCEPTING_FOR_ATTACH
    _INTERCEPTING_FOR_ATTACH = for_attach

## Modified parameters by Don Jayamanne
# Accept current Process id to pass back to debugger
visualstudio_py_debugger.py 文件源码 项目:pythonVSCode 作者: DonJayamanne 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def intercept_threads(for_attach = False):
    thread.start_new_thread = thread_creator
    thread.start_new = thread_creator

    # If threading has already been imported (i.e. we're attaching), we must hot-patch threading._start_new_thread
    # so that new threads started using it will be intercepted by our code.
    #
    # On the other hand, if threading has not been imported, we must not import it ourselves, because it will then
    # treat the current thread as the main thread, which is incorrect when attaching because this code is executing
    # on an ephemeral debugger attach thread that will go away shortly. We don't need to hot-patch it in that case
    # anyway, because it will pick up the new thread.start_new_thread that we have set above when it's imported.
    global _threading
    if _threading is None and 'threading' in sys.modules:
        import threading
        _threading = threading
        _threading._start_new_thread = thread_creator

    global _INTERCEPTING_FOR_ATTACH
    _INTERCEPTING_FOR_ATTACH = for_attach

## Modified parameters by Don Jayamanne
# Accept current Process id to pass back to debugger
win32pdhquery.py 文件源码 项目:purelove 作者: hucmosin 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def collectdatawhile(self, period=1):
        '''
        Threaded collection of performance data:
        This method sets up a simple semaphor system for signalling 
        when you would like to start and stop a threaded data collection
        method.  The collection runs every period seconds until the
        semaphor attribute is set to a non-true value (which normally
        should be done by calling query.collectdatawhile_stop() .)
        e.g.:
            query.collectdatawhile(2)
            # starts the query running, returns control to the caller immediately
            # is collecting data every two seconds.
            # do whatever you want to do while the thread runs, then call:
            query.collectdatawhile_stop()
            # when you want to deal with the data.  It is generally a good idea
            # to sleep for period seconds yourself, since the query will not copy
            # the required data until the next iteration:
            time.sleep(2)
            # now you can access the data from the attributes of the query
            query.curresults
            query.curpaths
        '''
        self.collectdatawhile_active = 1
        thread.start_new_thread(self.collectdatawhile_slave,(period,))
test_ffi_obj.py 文件源码 项目:SwiftKitten 作者: johncsnyder 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_init_once_multithread():
    if sys.version_info < (3,):
        import thread
    else:
        import _thread as thread
    import time
    #
    def do_init():
        print('init!')
        seen.append('init!')
        time.sleep(1)
        seen.append('init done')
        print('init done')
        return 7
    ffi = _cffi1_backend.FFI()
    seen = []
    for i in range(6):
        def f():
            res = ffi.init_once(do_init, "tag")
            seen.append(res)
        thread.start_new_thread(f, ())
    time.sleep(1.5)
    assert seen == ['init!', 'init done'] + 6 * [7]
test_ffi_obj.py 文件源码 项目:SwiftKitten 作者: johncsnyder 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_init_once_multithread_failure():
    if sys.version_info < (3,):
        import thread
    else:
        import _thread as thread
    import time
    def do_init():
        seen.append('init!')
        time.sleep(1)
        seen.append('oops')
        raise ValueError
    ffi = _cffi1_backend.FFI()
    seen = []
    for i in range(3):
        def f():
            py.test.raises(ValueError, ffi.init_once, do_init, "tag")
        thread.start_new_thread(f, ())
    i = 0
    while len(seen) < 6:
        i += 1
        assert i < 20
        time.sleep(0.51)
    assert seen == ['init!', 'oops'] * 3
backend_tests.py 文件源码 项目:SwiftKitten 作者: johncsnyder 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_init_once_multithread(self):
        import sys, time
        if sys.version_info < (3,):
            import thread
        else:
            import _thread as thread
        #
        def do_init():
            seen.append('init!')
            time.sleep(1)
            seen.append('init done')
            return 7
        ffi = FFI()
        seen = []
        for i in range(6):
            def f():
                res = ffi.init_once(do_init, "tag")
                seen.append(res)
            thread.start_new_thread(f, ())
        time.sleep(1.5)
        assert seen == ['init!', 'init done'] + 6 * [7]
test_verify.py 文件源码 项目:SwiftKitten 作者: johncsnyder 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _run_in_multiple_threads(test1):
    test1()
    import sys
    try:
        import thread
    except ImportError:
        import _thread as thread
    errors = []
    def wrapper(lock):
        try:
            test1()
        except:
            errors.append(sys.exc_info())
        lock.release()
    locks = []
    for i in range(10):
        _lock = thread.allocate_lock()
        _lock.acquire()
        thread.start_new_thread(wrapper, (_lock,))
        locks.append(_lock)
    for _lock in locks:
        _lock.acquire()
        if errors:
            raise errors[0][1]
player_monitor.py 文件源码 项目:plugin.audio.spotify 作者: marcelveldt 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def run(self):
        self.daemon_active = True
        while not self.__exit:
            log_msg("Start Spotify Connect Daemon")
            #spotty_args = ["-v"]
            spotty_args = ["--onstart", "curl -s -f -m 2 http://localhost:%s/playercmd/start" % PROXY_PORT,
                       "--onstop", "curl -s -f -m 2 http://localhost:%s/playercmd/stop" % PROXY_PORT]
            self.__spotty_proc = self.__spotty.run_spotty(arguments=spotty_args)
            thread.start_new_thread(self.fill_fake_buffer, ())
            while not self.__exit:
                line = self.__spotty_proc.stderr.readline().strip()
                if line:
                    log_msg(line, xbmc.LOGDEBUG)
                if self.__spotty_proc.returncode and self.__spotty_proc.returncode > 0 and not self.__exit:
                    # daemon crashed ? restart ?
                    break
        self.daemon_active = False
        log_msg("Stopped Spotify Connect Daemon")
discord_handler.py 文件源码 项目:PokemonGo-Bot 作者: PokemonGoF 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def handle_event(self, event, sender, level, formatted_msg, data):
        if self.dbot is None:
            try:
                self.bot.logger.info("Discord bot not running, Starting..")
                self.dbot = DiscordClass(self.bot, self.master, self.pokemons, self.config)
                self.dbot.connect()
                thread.start_new_thread(self.dbot.run)
            except Exception as inst:
                self.dbot = None
                self.bot.logger.error("Unable to start Discord bot; master: {}, exception: {}".format(self.master, pprint.pformat(inst)))
                return
        # prepare message to send
        msg = None
        msg = self.chat_handler.get_event(event, formatted_msg, data)
        if msg:
            self.dbot.sendMessage(to=self.master, text=msg)
QQ.py 文件源码 项目:PythonQQ 作者: zhang0chao0 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def main(udpCliSock1):
    global udpCliSock
    udpCliSock=udpCliSock1
    udpCliSock = socket(AF_INET, SOCK_DGRAM)
    root = Tk()
    root.title('QQ???')
    frame_left_top   = Frame(width=300, height=200, bg='white')
    frame_left_top.grid(row=0, column=0, padx=2, pady=5)
    frame_left_top.grid_propagate(0)
    global text_msglist
    text_msglist    = Text(frame_left_top)
    text_msglist.grid()
    Label(root,text='???',width=8).grid(row=1,column=0)
    global friend_account
    friend_account=StringVar()
    Entry(root,textvariable=friend_account).grid(row=2,column=0)
    Button(root, text='????',command=add_friend).grid(row=3,column=0)

    #????
    thread.start_new_thread(send,())
    #?????
    root.mainloop()
tools.py 文件源码 项目:true_review_web2py 作者: lucadealfaro 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def completion(callback):
    """
    Executes a task on completion of the called action.

    Example:
        Use as::

            from gluon.tools import completion
            @completion(lambda d: logging.info(repr(d)))
            def index():
                return dict(message='hello')

    It logs the output of the function every time input is called.
    The argument of completion is executed in a new thread.
    """
    def _completion(f):
        def __completion(*a, **b):
            d = None
            try:
                d = f(*a, **b)
                return d
            finally:
                thread.start_new_thread(callback, (d,))
        return __completion
    return _completion
tools.py 文件源码 项目:true_review_web2py 作者: lucadealfaro 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_thread_separation():
    def f():
        c = PluginManager()
        lock1.acquire()
        lock2.acquire()
        c.x = 7
        lock1.release()
        lock2.release()
    lock1 = thread.allocate_lock()
    lock2 = thread.allocate_lock()
    lock1.acquire()
    thread.start_new_thread(f, ())
    a = PluginManager()
    a.x = 5
    lock1.release()
    lock2.acquire()
    return a.x
IPStream.py 文件源码 项目:serbian-alpr 作者: golubaca 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def tst(self, frame, image):
        try:
            name = "/tmp/{}_{}.jpg".format(time.time(), self.camera_name)
            cv2.imwrite(name, frame)
        except Exception as e:
            print "snap:", e
            return False

        try:
            thread.start_new_thread(
                self.analize_plate.proccess,
                (image,
                 name,
                 self.camera_name,
                 self.image_location,
                 self.thumbnail_location))
        except:
            return False
input_rot_only.py 文件源码 项目:orangepi-radio 作者: thk4711 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def init():
    print(lirc_device)
    wiringpi.wiringPiSetup()
    wiringpi.pinMode(A_1_PIN,INPUT)
    wiringpi.pullUpDnControl(A_1_PIN,PUD_UP)
    wiringpi.pinMode(B_1_PIN,INPUT)
    wiringpi.pullUpDnControl(B_1_PIN,PUD_UP)
    wiringpi.pinMode(BTN_1_PIN,INPUT)
    wiringpi.pullUpDnControl(BTN_1_PIN,PUD_UP)

    wiringpi.pinMode(A_2_PIN,INPUT)
    wiringpi.pullUpDnControl(A_2_PIN,PUD_UP)
    wiringpi.pinMode(B_2_PIN,INPUT)
    wiringpi.pullUpDnControl(B_2_PIN,PUD_UP)
    wiringpi.pinMode(BTN_2_PIN,INPUT)
    wiringpi.pullUpDnControl(BTN_2_PIN,PUD_UP)

    wiringpi.pinMode(POWER_PIN,INPUT)
    wiringpi.pullUpDnControl(POWER_PIN,PUD_UP)

    thread.start_new_thread(encoder_loop, ())
    thread.start_new_thread(keypressd, (lirc_device, ))
tools.py 文件源码 项目:Problematica-public 作者: TechMaz 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def completion(callback):
    """
    Executes a task on completion of the called action.

    Example:
        Use as::

            from gluon.tools import completion
            @completion(lambda d: logging.info(repr(d)))
            def index():
                return dict(message='hello')

    It logs the output of the function every time input is called.
    The argument of completion is executed in a new thread.
    """
    def _completion(f):
        def __completion(*a, **b):
            d = None
            try:
                d = f(*a, **b)
                return d
            finally:
                thread.start_new_thread(callback, (d,))
        return __completion
    return _completion
tools.py 文件源码 项目:Problematica-public 作者: TechMaz 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_thread_separation():
    def f():
        c = PluginManager()
        lock1.acquire()
        lock2.acquire()
        c.x = 7
        lock1.release()
        lock2.release()
    lock1 = thread.allocate_lock()
    lock2 = thread.allocate_lock()
    lock1.acquire()
    thread.start_new_thread(f, ())
    a = PluginManager()
    a.x = 5
    lock1.release()
    lock2.acquire()
    return a.x
main.py 文件源码 项目:kivy-noveling 作者: liyuanrui 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def start(self):
        if self.chapterurl:
            if encode(self.noveldown.text) != '??':
                dirs=decode(self.noveldir.text).split('/')
                dirs=[i for i in dirs if i]
                #??????
                downdir=encode('/'+'/'.join(dirs))
                if not os.path.exists(downdir):
                    os.makedirs(downdir)
                #??downdir
                self.downdir=downdir+'/'+encode(self.novelname.text)+'.txt'
                self.novelshow.text=self.downdir+'\n'
                self.noveldown.text='??'
                thread.start_new_thread(self.newthread,())
            else:
                self.stop()

        else:
            self.novelshow.text='???????'

    #????
test_socket.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _setUp(self):
        self.server_ready = threading.Event()
        self.client_ready = threading.Event()
        self.done = threading.Event()
        self.queue = Queue.Queue(1)

        # Do some munging to start the client test.
        methodname = self.id()
        i = methodname.rfind('.')
        methodname = methodname[i+1:]
        test_method = getattr(self, '_' + methodname)
        self.client_thread = thread.start_new_thread(
            self.clientRun, (test_method,))

        self.__setUp()
        if not self.server_ready.is_set():
            self.server_ready.set()
        self.client_ready.wait()
test_threading.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def test_foreign_thread(self):
        # Check that a "foreign" thread can use the threading module.
        def f(mutex):
            # Calling current_thread() forces an entry for the foreign
            # thread to get made in the threading._active map.
            threading.current_thread()
            mutex.release()

        mutex = threading.Lock()
        mutex.acquire()
        tid = thread.start_new_thread(f, (mutex,))
        # Wait for the thread to finish.
        mutex.acquire()
        self.assertIn(tid, threading._active)
        self.assertIsInstance(threading._active[tid], threading._DummyThread)
        del threading._active[tid]

    # PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently)
    # exposed at the Python level.  This test relies on ctypes to get at it.
test_threading.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_dummy_thread_after_fork(self):
        # Issue #14308: a dummy thread in the active list doesn't mess up
        # the after-fork mechanism.
        code = """if 1:
            import thread, threading, os, time

            def background_thread(evt):
                # Creates and registers the _DummyThread instance
                threading.current_thread()
                evt.set()
                time.sleep(10)

            evt = threading.Event()
            thread.start_new_thread(background_thread, (evt,))
            evt.wait()
            assert threading.active_count() == 2, threading.active_count()
            if os.fork() == 0:
                assert threading.active_count() == 1, threading.active_count()
                os._exit(0)
            else:
                os.wait()
        """
        _, out, err = assert_python_ok("-c", code)
        self.assertEqual(out, '')
        self.assertEqual(err, '')
test_threading.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_foreign_thread(self):
        # Check that a "foreign" thread can use the threading module.
        def f(mutex):
            # Calling current_thread() forces an entry for the foreign
            # thread to get made in the threading._active map.
            threading.current_thread()
            mutex.release()

        mutex = threading.Lock()
        mutex.acquire()
        tid = thread.start_new_thread(f, (mutex,))
        # Wait for the thread to finish.
        mutex.acquire()
        self.assertIn(tid, threading._active)
        self.assertIsInstance(threading._active[tid], threading._DummyThread)
        del threading._active[tid]

    # PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently)
    # exposed at the Python level.  This test relies on ctypes to get at it.
test_threading.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_dummy_thread_after_fork(self):
        # Issue #14308: a dummy thread in the active list doesn't mess up
        # the after-fork mechanism.
        code = """if 1:
            import thread, threading, os, time

            def background_thread(evt):
                # Creates and registers the _DummyThread instance
                threading.current_thread()
                evt.set()
                time.sleep(10)

            evt = threading.Event()
            thread.start_new_thread(background_thread, (evt,))
            evt.wait()
            assert threading.active_count() == 2, threading.active_count()
            if os.fork() == 0:
                assert threading.active_count() == 1, threading.active_count()
                os._exit(0)
            else:
                os.wait()
        """
        _, out, err = assert_python_ok("-c", code)
        self.assertEqual(out, '')
        self.assertEqual(err, '')
speech2scpi.py 文件源码 项目:speech2scpi 作者: patricksebastien 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def evaluate(calling):
    c = str(text.get("1.0",END)).strip()
    for result in results:
                print('Trying to connect to {}...'.format(socket.inet_ntoa(result['zc_info'].address)))
                scpi_connection = get_scpi_connection_tuple((socket.inet_ntoa(result['zc_info'].address), result['zc_info'].port))

                if scpi_connection is not (None, None):
                    if any("?" in s for s in c):
                        rs = receive_scpi(scpi_connection, c)
                        textb.delete("1.0", END)
                        if strType(rs) == "float":
                            t = str(float(rs))
                        else:
                            t = str(rs)
                        textb.insert(END, t)
                        if ttsenabled.get():
                            thread.start_new_thread(tts, (t,))
                    else:
                        send_scpi(scpi_connection, str(text.get("1.0",END)).strip())
    text.delete("1.0", END)
test_additional_thread_info.py 文件源码 项目:specto 作者: mrknow 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_start_new_thread(self):
        pydev_monkey.patch_thread_modules()
        try:
            found = {}
            def function(a, b, *args, **kwargs):
                found['a'] = a
                found['b'] = b
                found['args'] = args
                found['kwargs'] = kwargs
            thread.start_new_thread(function, (1,2,3,4), {'d':1, 'e':2})
            import time
            for _i in xrange(20):
                if len(found) == 4:
                    break
                time.sleep(.1)
            else:
                raise AssertionError('Could not get to condition before 2 seconds')

            self.assertEqual({'a': 1, 'b': 2, 'args': (3, 4), 'kwargs': {'e': 2, 'd': 1}}, found)
        finally:
            pydev_monkey.undo_patch_thread_modules()
out.py 文件源码 项目:txt2evernote 作者: Xunius 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def launch():
        if not config.IS_OUT_TERMINAL:
            return
        preloader.counter = 0
        preloader.isLaunch = True
        thread.start_new_thread(preloader.draw, ())


问题


面经


文章

微信
公众号

扫码关注公众号