def test_limbo_cleanup(self):
# Issue 7481: Failure to start thread should cleanup the limbo map.
def fail_new_thread(*args):
raise thread.error()
_start_new_thread = threading._start_new_thread
threading._start_new_thread = fail_new_thread
try:
t = threading.Thread(target=lambda: None)
self.assertRaises(thread.error, t.start)
self.assertFalse(
t in threading._limbo,
"Failed to cleanup _limbo map on failure of Thread.start().")
finally:
threading._start_new_thread = _start_new_thread
python类_start_new_thread()的实例源码
def thread_creator(func, args, kwargs = {}):
id = _start_new_thread(new_thread_wrapper, (func, ) + args, kwargs)
return id
def command_connect_repl(self):
port_num = read_int(self.conn)
_start_new_thread(self.connect_to_repl_backend, (port_num,))
def intercept_threads(for_attach = False):
thread.start_new_thread = thread_creator
thread.start_new = thread_creator
global threading
if threading is None:
# we need to patch threading._start_new_thread so that
# we pick up new threads in the attach case when threading
# is already imported.
import threading
threading._start_new_thread = thread_creator
global _INTERCEPTING_FOR_ATTACH
_INTERCEPTING_FOR_ATTACH = for_attach
def detach_process():
global DETACHED
DETACHED = True
if not _INTERCEPTING_FOR_ATTACH:
if isinstance(sys.stdout, _DebuggerOutput):
sys.stdout = sys.stdout.old_out
if isinstance(sys.stderr, _DebuggerOutput):
sys.stderr = sys.stderr.old_out
if not _INTERCEPTING_FOR_ATTACH:
thread.start_new_thread = _start_new_thread
thread.start_new = _start_new_thread
def start_stream_threads(self):
threading._start_new_thread(self.start_stream, ())
def thread_creator(func, args, kwargs = {}, *extra_args):
if not isinstance(args, tuple):
# args is not a tuple. This may be because we have become bound to a
# class, which has offset our arguments by one.
if isinstance(kwargs, tuple):
func, args = args, kwargs
kwargs = extra_args[0] if len(extra_args) > 0 else {}
return _start_new_thread(new_thread_wrapper, (func, args, kwargs))
def command_connect_repl(self):
port_num = read_int(self.conn)
_start_new_thread(self.connect_to_repl_backend, (port_num,))
def detach_process():
global DETACHED
DETACHED = True
if not _INTERCEPTING_FOR_ATTACH:
if isinstance(sys.stdout, _DebuggerOutput):
sys.stdout = sys.stdout.old_out
if isinstance(sys.stderr, _DebuggerOutput):
sys.stderr = sys.stderr.old_out
if not _INTERCEPTING_FOR_ATTACH:
thread.start_new_thread = _start_new_thread
thread.start_new = _start_new_thread
def connect_repl_using_socket(sock):
_start_new_thread(DebuggerLoop.instance.connect_to_repl_backend_using_socket, (sock,))
def thread_creator(func, args, kwargs = {}, *extra_args):
if not isinstance(args, tuple):
# args is not a tuple. This may be because we have become bound to a
# class, which has offset our arguments by one.
if isinstance(kwargs, tuple):
func, args = args, kwargs
kwargs = extra_args[0] if len(extra_args) > 0 else {}
return _start_new_thread(new_thread_wrapper, (func, args, kwargs))
def command_connect_repl(self):
port_num = read_int(self.conn)
_start_new_thread(self.connect_to_repl_backend, (port_num,))
def detach_process():
global DETACHED
DETACHED = True
if not _INTERCEPTING_FOR_ATTACH:
if isinstance(sys.stdout, _DebuggerOutput):
sys.stdout = sys.stdout.old_out
if isinstance(sys.stderr, _DebuggerOutput):
sys.stderr = sys.stderr.old_out
if not _INTERCEPTING_FOR_ATTACH:
thread.start_new_thread = _start_new_thread
thread.start_new = _start_new_thread
def connect_repl_using_socket(sock):
_start_new_thread(DebuggerLoop.instance.connect_to_repl_backend_using_socket, (sock,))
def test_limbo_cleanup(self):
# Issue 7481: Failure to start thread should cleanup the limbo map.
def fail_new_thread(*args):
raise threading.ThreadError()
_start_new_thread = threading._start_new_thread
threading._start_new_thread = fail_new_thread
try:
t = threading.Thread(target=lambda: None)
self.assertRaises(threading.ThreadError, t.start)
self.assertFalse(
t in threading._limbo,
"Failed to cleanup _limbo map on failure of Thread.start().")
finally:
threading._start_new_thread = _start_new_thread
def __init__(self):
self._scene_loaded = False
self._objects = {}
self._materials = {}
self.buffer = [] # cmd buffer
self.callbacks = [ self.update_view, self.update_selected, self.update_materials ]
## launch Tundra ##
if sys.platform == 'linux2':
exe = os.path.join( CONFIG_TUNDRA, 'run-server.sh' )
assert os.path.isfile( exe )
cmd = [exe, '--config', TUNDRA_CONFIG_XML_PATH, '--fpslimit', '100', '--storage', '/tmp/']
print( cmd )
p = subprocess.Popen(cmd, stdin=subprocess.PIPE)
else:
exe = os.path.join( CONFIG_TUNDRA, 'Tundra.exe' )
assert os.path.isfile( exe )
cmd = [exe, '--file', PREVIEW, '--config', TUNDRA_CONFIG_XML_PATH]
p = subprocess.Popen(cmd, stdin=subprocess.PIPE)
self.proc = p
self.socket = sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
host='localhost'; port = 9978
sock.connect((host, port))
print('socket connected', sock)
self._handle = None
self.setup_callback( bpy.context )
self.ready = threading._allocate_lock()
self.ID = threading._start_new_thread(
self.loop, (None,)
)
print( '.....thread started......')
def __init__(self):
import socket
self.buffer = [] # cmd buffer
self.socket = sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP
host='localhost'; port = 9420
sock.connect((host, port))
print('SERVER: socket connected', sock)
self._handle = None
self.setup_callback( bpy.context )
import threading
self.ready = threading._allocate_lock()
self.ID = threading._start_new_thread(
self.loop, (None,)
)
print( 'SERVER: thread started')
def thread_creator(func, args, kwargs = {}, *extra_args):
if not isinstance(args, tuple):
# args is not a tuple. This may be because we have become bound to a
# class, which has offset our arguments by one.
if isinstance(kwargs, tuple):
func, args = args, kwargs
kwargs = extra_args[0] if len(extra_args) > 0 else {}
return _start_new_thread(new_thread_wrapper, (func, args, kwargs))
def command_connect_repl(self):
port_num = read_int(self.conn)
_start_new_thread(self.connect_to_repl_backend, (port_num,))
def detach_process():
global DETACHED
DETACHED = True
if not _INTERCEPTING_FOR_ATTACH:
if isinstance(sys.stdout, _DebuggerOutput):
sys.stdout = sys.stdout.old_out
if isinstance(sys.stderr, _DebuggerOutput):
sys.stderr = sys.stderr.old_out
if not _INTERCEPTING_FOR_ATTACH:
thread.start_new_thread = _start_new_thread
thread.start_new = _start_new_thread