python类error()的实例源码

thread.py 文件源码 项目:oa_qian 作者: sunqb 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def acquire(self, blocking=True, timeout=-1):
            # Transform the default -1 argument into the None that our
            # semaphore implementation expects, and raise the same error
            # the stdlib implementation does.
            if timeout == -1:
                timeout = None
            if not blocking and timeout is not None:
                raise ValueError("can't specify a timeout for a non-blocking call")
            if timeout is not None:
                if timeout < 0:
                    # in C: if(timeout < 0 && timeout != -1)
                    raise ValueError("timeout value must be strictly positive")
                if timeout > self._TIMEOUT_MAX:
                    raise OverflowError('timeout value is too large')

            return BoundedSemaphore.acquire(self, blocking, timeout)
test_threading.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _run_and_join(self, script):
        script = """if 1:
            import sys, os, time, threading

            # a thread, which waits for the main program to terminate
            def joiningfunc(mainthread):
                mainthread.join()
                print 'end of thread'
        \n""" + script

        p = subprocess.Popen([sys.executable, "-c", script], stdout=subprocess.PIPE)
        rc = p.wait()
        data = p.stdout.read().replace('\r', '')
        p.stdout.close()
        self.assertEqual(data, "end of main\nend of thread\n")
        self.assertFalse(rc == 2, "interpreter was blocked")
        self.assertTrue(rc == 0, "Unexpected error")
test_threading.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _run_and_join(self, script):
        script = """if 1:
            import sys, os, time, threading

            # a thread, which waits for the main program to terminate
            def joiningfunc(mainthread):
                mainthread.join()
                print 'end of thread'
        \n""" + script

        p = subprocess.Popen([sys.executable, "-c", script], stdout=subprocess.PIPE)
        rc = p.wait()
        data = p.stdout.read().replace('\r', '')
        p.stdout.close()
        self.assertEqual(data, "end of main\nend of thread\n")
        self.assertFalse(rc == 2, "interpreter was blocked")
        self.assertTrue(rc == 0, "Unexpected error")
test_threading.py 文件源码 项目:pefile.pypy 作者: cloudtracer 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _run_and_join(self, script):
        script = """if 1:
            import sys, os, time, threading

            # a thread, which waits for the main program to terminate
            def joiningfunc(mainthread):
                mainthread.join()
                print 'end of thread'
                # stdout is fully buffered because not a tty, we have to flush
                # before exit.
                sys.stdout.flush()
        \n""" + script

        p = subprocess.Popen([sys.executable, "-c", script], stdout=subprocess.PIPE)
        rc = p.wait()
        data = p.stdout.read().replace('\r', '')
        p.stdout.close()
        self.assertEqual(data, "end of main\nend of thread\n")
        self.assertFalse(rc == 2, "interpreter was blocked")
        self.assertTrue(rc == 0, "Unexpected error")
test_threading.py 文件源码 项目:ndk-python 作者: gittor 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _run_and_join(self, script):
        script = """if 1:
            import sys, os, time, threading

            # a thread, which waits for the main program to terminate
            def joiningfunc(mainthread):
                mainthread.join()
                print 'end of thread'
        \n""" + script

        p = subprocess.Popen([sys.executable, "-c", script], stdout=subprocess.PIPE)
        rc = p.wait()
        data = p.stdout.read().replace('\r', '')
        p.stdout.close()
        self.assertEqual(data, "end of main\nend of thread\n")
        self.assertFalse(rc == 2, "interpreter was blocked")
        self.assertTrue(rc == 0, "Unexpected error")
threading.py 文件源码 项目:Deploy_XXNET_Server 作者: jzp820927 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def wait(self, timeout=None):
        self.__cond.acquire()
        try:
            if not self.__flag:
                self.__cond.wait(timeout)
            return self.__flag
        finally:
            # NOTE(google) Added the try/except. This handles the possibility of
            # an asynchronous exception (e.g., DeadlineExceededError) being
            # thrown inside of Condition.wait such that it does not re-acquire
            # the lock, causing a ThreadError 'release unlocked lock' to be
            # raised by Condition.release.
            # It is safe to ignore such an error, because it means the lock is
            # already released.
            try:
                self.__cond.release()
            except ThreadError:
                pass

# Helper to generate new thread names
threading.py 文件源码 项目:Deploy_XXNET_Server 作者: jzp820927 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def start(self):
        if not self.__initialized:
            raise RuntimeError("thread.__init__() not called")
        if self.__started.is_set():
            raise RuntimeError("threads can only be started once")
        if __debug__:
            self._note("%s.start(): starting thread", self)
        with _active_limbo_lock:
            _limbo[self] = self
        try:
            _start_new_thread(self.__bootstrap, ())
        except Exception:
            with _active_limbo_lock:
                # NOTE(google) Added 'in' check. This handles the possibility of
                # an asynchronous exception (e.g., DeadlineExceededError) being
                # thrown inside of __bootstrap_inner after it removes self from
                # _limbo, causing a KeyError to be raised here. It is safe to
                # ignore such an error, because it means self has already been
                # removed from _limbo.
                if self in _limbo:
                    del _limbo[self]
            raise
        self.__started.wait()
threading.py 文件源码 项目:Docker-XX-Net 作者: kuanghy 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def join(self, timeout=None):
        """Wait until the thread terminates.

        This blocks the calling thread until the thread whose join() method is
        called terminates -- either normally or through an unhandled exception
        or until the optional timeout occurs.

        When the timeout argument is present and not None, it should be a
        floating point number specifying a timeout for the operation in seconds
        (or fractions thereof). As join() always returns None, you must call
        isAlive() after join() to decide whether a timeout happened -- if the
        thread is still alive, the join() call timed out.

        When the timeout argument is not present or None, the operation will
        block until the thread terminates.

        A thread can be join()ed many times.

        join() raises a RuntimeError if an attempt is made to join the current
        thread as that would cause a deadlock. It is also an error to join() a
        thread before it has been started and attempts to do so raises the same
        exception.

        """
        if not self.__initialized:
            raise RuntimeError("Thread.__init__() not called")
        if not self.__started.is_set():
            raise RuntimeError("cannot join thread before it is started")
        if self is current_thread():
            raise RuntimeError("cannot join current thread")

        if __debug__:
            if not self.__stopped:
                self._note("%s.join(): waiting until thread stops", self)
        self.__block.acquire()
        try:
            if timeout is None:
                while not self.__stopped:
                    self.__block.wait()
                if __debug__:
                    self._note("%s.join(): thread stopped", self)
            else:
                deadline = _time() + timeout
                while not self.__stopped:
                    delay = deadline - _time()
                    if delay <= 0:
                        if __debug__:
                            self._note("%s.join(): timed out", self)
                        break
                    self.__block.wait(delay)
                else:
                    if __debug__:
                        self._note("%s.join(): thread stopped", self)
        finally:
            self.__block.release()
thread.py 文件源码 项目:Lixiang_zhaoxin 作者: hejaxian 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def acquire(self, blocking=True, timeout=-1):
            # Transform the default -1 argument into the None that our
            # semaphore implementation expects, and raise the same error
            # the stdlib implementation does.
            if timeout == -1:
                timeout = None
            if not blocking and timeout is not None:
                raise ValueError("can't specify a timeout for a non-blocking call")
            if timeout is not None:
                if timeout < 0:
                    # in C: if(timeout < 0 && timeout != -1)
                    raise ValueError("timeout value must be strictly positive")
                if timeout > self._TIMEOUT_MAX:
                    raise OverflowError('timeout value is too large')

            return BoundedSemaphore.acquire(self, blocking, timeout)
recipe-578194.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def close(self):
        try:
            self._socket.close()
        except socket.error:
            pass
recipe-578194.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def run(self):
        try:
            self.run_server()
        except (EOFError, ValueError, socket.error), e:
            logger.error(e)

        try:
            self._socket.close()
        except socket.error:
            pass
recipe-578194.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def release(self, who, name):
        try:
            self.get_lock(who, name).release()
        except thread.error, e:
            raise ValueError('%s: cannot unlock %s: %s' % (who, name, e))

        logger.info('%s released %s', who, name)
recipe-578194.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def serve_forever():
    _socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    try:
        host, port = ( os.environ.get('DLM') or sys.argv[1] ).split(':')
        port = int(port)
    except (IndexError, ValueError):
        host, port = 'localhost', 27272

    try:
        _socket.bind((host, port))
        _socket.listen(5)

        logger.info('Listening on %s:%s', host, port )

        while True:
            inputs =  [ _socket ]
            inputready, outputready, exceptready = select.select(inputs, [], [])

            [ LockServer(*stream.accept()) for stream in inputready ]
    finally:
        try:
            _socket.close()
            logger.info('Socket closed')
        except socket.error:
            pass
test_threading.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_various_ops_small_stack(self):
        if verbose:
            print 'with 256kB thread stack size...'
        try:
            threading.stack_size(262144)
        except thread.error:
            self.skipTest('platform does not support changing thread stack size')
        self.test_various_ops()
        threading.stack_size(0)

    # run with a large thread stack size (1MB)
test_threading.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_various_ops_large_stack(self):
        if verbose:
            print 'with 1MB thread stack size...'
        try:
            threading.stack_size(0x100000)
        except thread.error:
            self.skipTest('platform does not support changing thread stack size')
        self.test_various_ops()
        threading.stack_size(0)
test_threading.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_limbo_cleanup(self):
        # Issue 7481: Failure to start thread should cleanup the limbo map.
        def fail_new_thread(*args):
            raise thread.error()
        _start_new_thread = threading._start_new_thread
        threading._start_new_thread = fail_new_thread
        try:
            t = threading.Thread(target=lambda: None)
            self.assertRaises(thread.error, t.start)
            self.assertFalse(
                t in threading._limbo,
                "Failed to cleanup _limbo map on failure of Thread.start().")
        finally:
            threading._start_new_thread = _start_new_thread
test_threading.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_finalize_with_trace(self):
        # Issue1733757
        # Avoid a deadlock when sys.settrace steps into threading._shutdown
        p = subprocess.Popen([sys.executable, "-c", """if 1:
            import sys, threading

            # A deadlock-killer, to prevent the
            # testsuite to hang forever
            def killer():
                import os, time
                time.sleep(2)
                print 'program blocked; aborting'
                os._exit(2)
            t = threading.Thread(target=killer)
            t.daemon = True
            t.start()

            # This is the trace function
            def func(frame, event, arg):
                threading.current_thread()
                return func

            sys.settrace(func)
            """],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE)
        self.addCleanup(p.stdout.close)
        self.addCleanup(p.stderr.close)
        stdout, stderr = p.communicate()
        rc = p.returncode
        self.assertFalse(rc == 2, "interpreted was blocked")
        self.assertTrue(rc == 0,
                        "Unexpected error: " + repr(stderr))
test_threading.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_recursion_limit(self):
        # Issue 9670
        # test that excessive recursion within a non-main thread causes
        # an exception rather than crashing the interpreter on platforms
        # like Mac OS X or FreeBSD which have small default stack sizes
        # for threads
        script = """if True:
            import threading

            def recurse():
                return recurse()

            def outer():
                try:
                    recurse()
                except RuntimeError:
                    pass

            w = threading.Thread(target=outer)
            w.start()
            w.join()
            print('end of main thread')
            """
        expected_output = "end of main thread\n"
        p = subprocess.Popen([sys.executable, "-c", script],
                             stdout=subprocess.PIPE)
        stdout, stderr = p.communicate()
        data = stdout.decode().replace('\r', '')
        self.assertEqual(p.returncode, 0, "Unexpected error")
        self.assertEqual(data, expected_output)
test_threading.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_various_ops_small_stack(self):
        if verbose:
            print 'with 256kB thread stack size...'
        try:
            threading.stack_size(262144)
        except thread.error:
            self.skipTest('platform does not support changing thread stack size')
        self.test_various_ops()
        threading.stack_size(0)

    # run with a large thread stack size (1MB)
test_threading.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_various_ops_large_stack(self):
        if verbose:
            print 'with 1MB thread stack size...'
        try:
            threading.stack_size(0x100000)
        except thread.error:
            self.skipTest('platform does not support changing thread stack size')
        self.test_various_ops()
        threading.stack_size(0)
test_threading.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_limbo_cleanup(self):
        # Issue 7481: Failure to start thread should cleanup the limbo map.
        def fail_new_thread(*args):
            raise thread.error()
        _start_new_thread = threading._start_new_thread
        threading._start_new_thread = fail_new_thread
        try:
            t = threading.Thread(target=lambda: None)
            self.assertRaises(thread.error, t.start)
            self.assertFalse(
                t in threading._limbo,
                "Failed to cleanup _limbo map on failure of Thread.start().")
        finally:
            threading._start_new_thread = _start_new_thread
test_threading.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_finalize_with_trace(self):
        # Issue1733757
        # Avoid a deadlock when sys.settrace steps into threading._shutdown
        p = subprocess.Popen([sys.executable, "-c", """if 1:
            import sys, threading

            # A deadlock-killer, to prevent the
            # testsuite to hang forever
            def killer():
                import os, time
                time.sleep(2)
                print 'program blocked; aborting'
                os._exit(2)
            t = threading.Thread(target=killer)
            t.daemon = True
            t.start()

            # This is the trace function
            def func(frame, event, arg):
                threading.current_thread()
                return func

            sys.settrace(func)
            """],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE)
        self.addCleanup(p.stdout.close)
        self.addCleanup(p.stderr.close)
        stdout, stderr = p.communicate()
        rc = p.returncode
        self.assertFalse(rc == 2, "interpreted was blocked")
        self.assertTrue(rc == 0,
                        "Unexpected error: " + repr(stderr))
test_threading.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_recursion_limit(self):
        # Issue 9670
        # test that excessive recursion within a non-main thread causes
        # an exception rather than crashing the interpreter on platforms
        # like Mac OS X or FreeBSD which have small default stack sizes
        # for threads
        script = """if True:
            import threading

            def recurse():
                return recurse()

            def outer():
                try:
                    recurse()
                except RuntimeError:
                    pass

            w = threading.Thread(target=outer)
            w.start()
            w.join()
            print('end of main thread')
            """
        expected_output = "end of main thread\n"
        p = subprocess.Popen([sys.executable, "-c", script],
                             stdout=subprocess.PIPE)
        stdout, stderr = p.communicate()
        data = stdout.decode().replace('\r', '')
        self.assertEqual(p.returncode, 0, "Unexpected error")
        self.assertEqual(data, expected_output)
thread_tools.py 文件源码 项目:cpy2py 作者: maxfischer2781 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def put(self, item):
        """Put a single item in the queue"""
        with self._queue_mutex:
            self._queue_content.append(item)
            for w_idx in range(len(self._waiters)):
                try:
                    self._waiters[w_idx].release()
                except (ThreadError, RuntimeError, thread_error):
                    continue
                else:
                    break
test_threading.py 文件源码 项目:pefile.pypy 作者: cloudtracer 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_various_ops_small_stack(self):
        if verbose:
            print 'with 256kB thread stack size...'
        try:
            threading.stack_size(262144)
        except thread.error:
            self.skipTest('platform does not support changing thread stack size')
        self.test_various_ops()
        threading.stack_size(0)

    # run with a large thread stack size (1MB)
test_threading.py 文件源码 项目:pefile.pypy 作者: cloudtracer 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_various_ops_large_stack(self):
        if verbose:
            print 'with 1MB thread stack size...'
        try:
            threading.stack_size(0x100000)
        except thread.error:
            self.skipTest('platform does not support changing thread stack size')
        self.test_various_ops()
        threading.stack_size(0)
test_threading.py 文件源码 项目:pefile.pypy 作者: cloudtracer 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_limbo_cleanup(self):
        # Issue 7481: Failure to start thread should cleanup the limbo map.
        def fail_new_thread(*args):
            raise thread.error()
        _start_new_thread = threading._start_new_thread
        threading._start_new_thread = fail_new_thread
        try:
            t = threading.Thread(target=lambda: None)
            self.assertRaises(thread.error, t.start)
            self.assertFalse(
                t in threading._limbo,
                "Failed to cleanup _limbo map on failure of Thread.start().")
        finally:
            threading._start_new_thread = _start_new_thread
test_threading.py 文件源码 项目:pefile.pypy 作者: cloudtracer 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_finalize_with_trace(self):
        # Issue1733757
        # Avoid a deadlock when sys.settrace steps into threading._shutdown
        p = subprocess.Popen([sys.executable, "-c", """if 1:
            import sys, threading

            # A deadlock-killer, to prevent the
            # testsuite to hang forever
            def killer():
                import os, time
                time.sleep(2)
                print 'program blocked; aborting'
                os._exit(2)
            t = threading.Thread(target=killer)
            t.daemon = True
            t.start()

            # This is the trace function
            def func(frame, event, arg):
                threading.current_thread()
                return func

            sys.settrace(func)
            """],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE)
        self.addCleanup(p.stdout.close)
        self.addCleanup(p.stderr.close)
        stdout, stderr = p.communicate()
        rc = p.returncode
        self.assertFalse(rc == 2, "interpreted was blocked")
        self.assertTrue(rc == 0,
                        "Unexpected error: " + repr(stderr))
test_threading.py 文件源码 项目:pefile.pypy 作者: cloudtracer 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_recursion_limit(self):
        # Issue 9670
        # test that excessive recursion within a non-main thread causes
        # an exception rather than crashing the interpreter on platforms
        # like Mac OS X or FreeBSD which have small default stack sizes
        # for threads
        script = """if True:
            import threading

            def recurse():
                return recurse()

            def outer():
                try:
                    recurse()
                except RuntimeError:
                    pass

            w = threading.Thread(target=outer)
            w.start()
            w.join()
            print('end of main thread')
            """
        expected_output = "end of main thread\n"
        p = subprocess.Popen([sys.executable, "-c", script],
                             stdout=subprocess.PIPE)
        stdout, stderr = p.communicate()
        data = stdout.decode().replace('\r', '')
        self.assertEqual(p.returncode, 0, "Unexpected error")
        self.assertEqual(data, expected_output)
test_threading.py 文件源码 项目:ndk-python 作者: gittor 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_various_ops_small_stack(self):
        if verbose:
            print 'with 256kB thread stack size...'
        try:
            threading.stack_size(262144)
        except thread.error:
            if verbose:
                print 'platform does not support changing thread stack size'
            return
        self.test_various_ops()
        threading.stack_size(0)

    # run with a large thread stack size (1MB)


问题


面经


文章

微信
公众号

扫码关注公众号