def acquire(self, blocking=True, timeout=-1):
# Transform the default -1 argument into the None that our
# semaphore implementation expects, and raise the same error
# the stdlib implementation does.
if timeout == -1:
timeout = None
if not blocking and timeout is not None:
raise ValueError("can't specify a timeout for a non-blocking call")
if timeout is not None:
if timeout < 0:
# in C: if(timeout < 0 && timeout != -1)
raise ValueError("timeout value must be strictly positive")
if timeout > self._TIMEOUT_MAX:
raise OverflowError('timeout value is too large')
return BoundedSemaphore.acquire(self, blocking, timeout)
python类error()的实例源码
def _run_and_join(self, script):
script = """if 1:
import sys, os, time, threading
# a thread, which waits for the main program to terminate
def joiningfunc(mainthread):
mainthread.join()
print 'end of thread'
\n""" + script
p = subprocess.Popen([sys.executable, "-c", script], stdout=subprocess.PIPE)
rc = p.wait()
data = p.stdout.read().replace('\r', '')
p.stdout.close()
self.assertEqual(data, "end of main\nend of thread\n")
self.assertFalse(rc == 2, "interpreter was blocked")
self.assertTrue(rc == 0, "Unexpected error")
def _run_and_join(self, script):
script = """if 1:
import sys, os, time, threading
# a thread, which waits for the main program to terminate
def joiningfunc(mainthread):
mainthread.join()
print 'end of thread'
\n""" + script
p = subprocess.Popen([sys.executable, "-c", script], stdout=subprocess.PIPE)
rc = p.wait()
data = p.stdout.read().replace('\r', '')
p.stdout.close()
self.assertEqual(data, "end of main\nend of thread\n")
self.assertFalse(rc == 2, "interpreter was blocked")
self.assertTrue(rc == 0, "Unexpected error")
def _run_and_join(self, script):
script = """if 1:
import sys, os, time, threading
# a thread, which waits for the main program to terminate
def joiningfunc(mainthread):
mainthread.join()
print 'end of thread'
# stdout is fully buffered because not a tty, we have to flush
# before exit.
sys.stdout.flush()
\n""" + script
p = subprocess.Popen([sys.executable, "-c", script], stdout=subprocess.PIPE)
rc = p.wait()
data = p.stdout.read().replace('\r', '')
p.stdout.close()
self.assertEqual(data, "end of main\nend of thread\n")
self.assertFalse(rc == 2, "interpreter was blocked")
self.assertTrue(rc == 0, "Unexpected error")
def _run_and_join(self, script):
script = """if 1:
import sys, os, time, threading
# a thread, which waits for the main program to terminate
def joiningfunc(mainthread):
mainthread.join()
print 'end of thread'
\n""" + script
p = subprocess.Popen([sys.executable, "-c", script], stdout=subprocess.PIPE)
rc = p.wait()
data = p.stdout.read().replace('\r', '')
p.stdout.close()
self.assertEqual(data, "end of main\nend of thread\n")
self.assertFalse(rc == 2, "interpreter was blocked")
self.assertTrue(rc == 0, "Unexpected error")
def wait(self, timeout=None):
self.__cond.acquire()
try:
if not self.__flag:
self.__cond.wait(timeout)
return self.__flag
finally:
# NOTE(google) Added the try/except. This handles the possibility of
# an asynchronous exception (e.g., DeadlineExceededError) being
# thrown inside of Condition.wait such that it does not re-acquire
# the lock, causing a ThreadError 'release unlocked lock' to be
# raised by Condition.release.
# It is safe to ignore such an error, because it means the lock is
# already released.
try:
self.__cond.release()
except ThreadError:
pass
# Helper to generate new thread names
def start(self):
if not self.__initialized:
raise RuntimeError("thread.__init__() not called")
if self.__started.is_set():
raise RuntimeError("threads can only be started once")
if __debug__:
self._note("%s.start(): starting thread", self)
with _active_limbo_lock:
_limbo[self] = self
try:
_start_new_thread(self.__bootstrap, ())
except Exception:
with _active_limbo_lock:
# NOTE(google) Added 'in' check. This handles the possibility of
# an asynchronous exception (e.g., DeadlineExceededError) being
# thrown inside of __bootstrap_inner after it removes self from
# _limbo, causing a KeyError to be raised here. It is safe to
# ignore such an error, because it means self has already been
# removed from _limbo.
if self in _limbo:
del _limbo[self]
raise
self.__started.wait()
def join(self, timeout=None):
"""Wait until the thread terminates.
This blocks the calling thread until the thread whose join() method is
called terminates -- either normally or through an unhandled exception
or until the optional timeout occurs.
When the timeout argument is present and not None, it should be a
floating point number specifying a timeout for the operation in seconds
(or fractions thereof). As join() always returns None, you must call
isAlive() after join() to decide whether a timeout happened -- if the
thread is still alive, the join() call timed out.
When the timeout argument is not present or None, the operation will
block until the thread terminates.
A thread can be join()ed many times.
join() raises a RuntimeError if an attempt is made to join the current
thread as that would cause a deadlock. It is also an error to join() a
thread before it has been started and attempts to do so raises the same
exception.
"""
if not self.__initialized:
raise RuntimeError("Thread.__init__() not called")
if not self.__started.is_set():
raise RuntimeError("cannot join thread before it is started")
if self is current_thread():
raise RuntimeError("cannot join current thread")
if __debug__:
if not self.__stopped:
self._note("%s.join(): waiting until thread stops", self)
self.__block.acquire()
try:
if timeout is None:
while not self.__stopped:
self.__block.wait()
if __debug__:
self._note("%s.join(): thread stopped", self)
else:
deadline = _time() + timeout
while not self.__stopped:
delay = deadline - _time()
if delay <= 0:
if __debug__:
self._note("%s.join(): timed out", self)
break
self.__block.wait(delay)
else:
if __debug__:
self._note("%s.join(): thread stopped", self)
finally:
self.__block.release()
def acquire(self, blocking=True, timeout=-1):
# Transform the default -1 argument into the None that our
# semaphore implementation expects, and raise the same error
# the stdlib implementation does.
if timeout == -1:
timeout = None
if not blocking and timeout is not None:
raise ValueError("can't specify a timeout for a non-blocking call")
if timeout is not None:
if timeout < 0:
# in C: if(timeout < 0 && timeout != -1)
raise ValueError("timeout value must be strictly positive")
if timeout > self._TIMEOUT_MAX:
raise OverflowError('timeout value is too large')
return BoundedSemaphore.acquire(self, blocking, timeout)
def close(self):
try:
self._socket.close()
except socket.error:
pass
def run(self):
try:
self.run_server()
except (EOFError, ValueError, socket.error), e:
logger.error(e)
try:
self._socket.close()
except socket.error:
pass
def release(self, who, name):
try:
self.get_lock(who, name).release()
except thread.error, e:
raise ValueError('%s: cannot unlock %s: %s' % (who, name, e))
logger.info('%s released %s', who, name)
def serve_forever():
_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
host, port = ( os.environ.get('DLM') or sys.argv[1] ).split(':')
port = int(port)
except (IndexError, ValueError):
host, port = 'localhost', 27272
try:
_socket.bind((host, port))
_socket.listen(5)
logger.info('Listening on %s:%s', host, port )
while True:
inputs = [ _socket ]
inputready, outputready, exceptready = select.select(inputs, [], [])
[ LockServer(*stream.accept()) for stream in inputready ]
finally:
try:
_socket.close()
logger.info('Socket closed')
except socket.error:
pass
def test_various_ops_small_stack(self):
if verbose:
print 'with 256kB thread stack size...'
try:
threading.stack_size(262144)
except thread.error:
self.skipTest('platform does not support changing thread stack size')
self.test_various_ops()
threading.stack_size(0)
# run with a large thread stack size (1MB)
def test_various_ops_large_stack(self):
if verbose:
print 'with 1MB thread stack size...'
try:
threading.stack_size(0x100000)
except thread.error:
self.skipTest('platform does not support changing thread stack size')
self.test_various_ops()
threading.stack_size(0)
def test_limbo_cleanup(self):
# Issue 7481: Failure to start thread should cleanup the limbo map.
def fail_new_thread(*args):
raise thread.error()
_start_new_thread = threading._start_new_thread
threading._start_new_thread = fail_new_thread
try:
t = threading.Thread(target=lambda: None)
self.assertRaises(thread.error, t.start)
self.assertFalse(
t in threading._limbo,
"Failed to cleanup _limbo map on failure of Thread.start().")
finally:
threading._start_new_thread = _start_new_thread
def test_finalize_with_trace(self):
# Issue1733757
# Avoid a deadlock when sys.settrace steps into threading._shutdown
p = subprocess.Popen([sys.executable, "-c", """if 1:
import sys, threading
# A deadlock-killer, to prevent the
# testsuite to hang forever
def killer():
import os, time
time.sleep(2)
print 'program blocked; aborting'
os._exit(2)
t = threading.Thread(target=killer)
t.daemon = True
t.start()
# This is the trace function
def func(frame, event, arg):
threading.current_thread()
return func
sys.settrace(func)
"""],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
self.addCleanup(p.stdout.close)
self.addCleanup(p.stderr.close)
stdout, stderr = p.communicate()
rc = p.returncode
self.assertFalse(rc == 2, "interpreted was blocked")
self.assertTrue(rc == 0,
"Unexpected error: " + repr(stderr))
def test_recursion_limit(self):
# Issue 9670
# test that excessive recursion within a non-main thread causes
# an exception rather than crashing the interpreter on platforms
# like Mac OS X or FreeBSD which have small default stack sizes
# for threads
script = """if True:
import threading
def recurse():
return recurse()
def outer():
try:
recurse()
except RuntimeError:
pass
w = threading.Thread(target=outer)
w.start()
w.join()
print('end of main thread')
"""
expected_output = "end of main thread\n"
p = subprocess.Popen([sys.executable, "-c", script],
stdout=subprocess.PIPE)
stdout, stderr = p.communicate()
data = stdout.decode().replace('\r', '')
self.assertEqual(p.returncode, 0, "Unexpected error")
self.assertEqual(data, expected_output)
def test_various_ops_small_stack(self):
if verbose:
print 'with 256kB thread stack size...'
try:
threading.stack_size(262144)
except thread.error:
self.skipTest('platform does not support changing thread stack size')
self.test_various_ops()
threading.stack_size(0)
# run with a large thread stack size (1MB)
def test_various_ops_large_stack(self):
if verbose:
print 'with 1MB thread stack size...'
try:
threading.stack_size(0x100000)
except thread.error:
self.skipTest('platform does not support changing thread stack size')
self.test_various_ops()
threading.stack_size(0)
def test_limbo_cleanup(self):
# Issue 7481: Failure to start thread should cleanup the limbo map.
def fail_new_thread(*args):
raise thread.error()
_start_new_thread = threading._start_new_thread
threading._start_new_thread = fail_new_thread
try:
t = threading.Thread(target=lambda: None)
self.assertRaises(thread.error, t.start)
self.assertFalse(
t in threading._limbo,
"Failed to cleanup _limbo map on failure of Thread.start().")
finally:
threading._start_new_thread = _start_new_thread
def test_finalize_with_trace(self):
# Issue1733757
# Avoid a deadlock when sys.settrace steps into threading._shutdown
p = subprocess.Popen([sys.executable, "-c", """if 1:
import sys, threading
# A deadlock-killer, to prevent the
# testsuite to hang forever
def killer():
import os, time
time.sleep(2)
print 'program blocked; aborting'
os._exit(2)
t = threading.Thread(target=killer)
t.daemon = True
t.start()
# This is the trace function
def func(frame, event, arg):
threading.current_thread()
return func
sys.settrace(func)
"""],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
self.addCleanup(p.stdout.close)
self.addCleanup(p.stderr.close)
stdout, stderr = p.communicate()
rc = p.returncode
self.assertFalse(rc == 2, "interpreted was blocked")
self.assertTrue(rc == 0,
"Unexpected error: " + repr(stderr))
def test_recursion_limit(self):
# Issue 9670
# test that excessive recursion within a non-main thread causes
# an exception rather than crashing the interpreter on platforms
# like Mac OS X or FreeBSD which have small default stack sizes
# for threads
script = """if True:
import threading
def recurse():
return recurse()
def outer():
try:
recurse()
except RuntimeError:
pass
w = threading.Thread(target=outer)
w.start()
w.join()
print('end of main thread')
"""
expected_output = "end of main thread\n"
p = subprocess.Popen([sys.executable, "-c", script],
stdout=subprocess.PIPE)
stdout, stderr = p.communicate()
data = stdout.decode().replace('\r', '')
self.assertEqual(p.returncode, 0, "Unexpected error")
self.assertEqual(data, expected_output)
def put(self, item):
"""Put a single item in the queue"""
with self._queue_mutex:
self._queue_content.append(item)
for w_idx in range(len(self._waiters)):
try:
self._waiters[w_idx].release()
except (ThreadError, RuntimeError, thread_error):
continue
else:
break
def test_various_ops_small_stack(self):
if verbose:
print 'with 256kB thread stack size...'
try:
threading.stack_size(262144)
except thread.error:
self.skipTest('platform does not support changing thread stack size')
self.test_various_ops()
threading.stack_size(0)
# run with a large thread stack size (1MB)
def test_various_ops_large_stack(self):
if verbose:
print 'with 1MB thread stack size...'
try:
threading.stack_size(0x100000)
except thread.error:
self.skipTest('platform does not support changing thread stack size')
self.test_various_ops()
threading.stack_size(0)
def test_limbo_cleanup(self):
# Issue 7481: Failure to start thread should cleanup the limbo map.
def fail_new_thread(*args):
raise thread.error()
_start_new_thread = threading._start_new_thread
threading._start_new_thread = fail_new_thread
try:
t = threading.Thread(target=lambda: None)
self.assertRaises(thread.error, t.start)
self.assertFalse(
t in threading._limbo,
"Failed to cleanup _limbo map on failure of Thread.start().")
finally:
threading._start_new_thread = _start_new_thread
def test_finalize_with_trace(self):
# Issue1733757
# Avoid a deadlock when sys.settrace steps into threading._shutdown
p = subprocess.Popen([sys.executable, "-c", """if 1:
import sys, threading
# A deadlock-killer, to prevent the
# testsuite to hang forever
def killer():
import os, time
time.sleep(2)
print 'program blocked; aborting'
os._exit(2)
t = threading.Thread(target=killer)
t.daemon = True
t.start()
# This is the trace function
def func(frame, event, arg):
threading.current_thread()
return func
sys.settrace(func)
"""],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
self.addCleanup(p.stdout.close)
self.addCleanup(p.stderr.close)
stdout, stderr = p.communicate()
rc = p.returncode
self.assertFalse(rc == 2, "interpreted was blocked")
self.assertTrue(rc == 0,
"Unexpected error: " + repr(stderr))
def test_recursion_limit(self):
# Issue 9670
# test that excessive recursion within a non-main thread causes
# an exception rather than crashing the interpreter on platforms
# like Mac OS X or FreeBSD which have small default stack sizes
# for threads
script = """if True:
import threading
def recurse():
return recurse()
def outer():
try:
recurse()
except RuntimeError:
pass
w = threading.Thread(target=outer)
w.start()
w.join()
print('end of main thread')
"""
expected_output = "end of main thread\n"
p = subprocess.Popen([sys.executable, "-c", script],
stdout=subprocess.PIPE)
stdout, stderr = p.communicate()
data = stdout.decode().replace('\r', '')
self.assertEqual(p.returncode, 0, "Unexpected error")
self.assertEqual(data, expected_output)
def test_various_ops_small_stack(self):
if verbose:
print 'with 256kB thread stack size...'
try:
threading.stack_size(262144)
except thread.error:
if verbose:
print 'platform does not support changing thread stack size'
return
self.test_various_ops()
threading.stack_size(0)
# run with a large thread stack size (1MB)