def intercept_threads(for_attach = False):
thread.start_new_thread = thread_creator
thread.start_new = thread_creator
# If threading has already been imported (i.e. we're attaching), we must hot-patch threading._start_new_thread
# so that new threads started using it will be intercepted by our code.
#
# On the other hand, if threading has not been imported, we must not import it ourselves, because it will then
# treat the current thread as the main thread, which is incorrect when attaching because this code is executing
# on an ephemeral debugger attach thread that will go away shortly. We don't need to hot-patch it in that case
# anyway, because it will pick up the new thread.start_new_thread that we have set above when it's imported.
global _threading
if _threading is None and 'threading' in sys.modules:
import threading
_threading = threading
_threading._start_new_thread = thread_creator
global _INTERCEPTING_FOR_ATTACH
_INTERCEPTING_FOR_ATTACH = for_attach
## Modified parameters by Don Jayamanne
# Accept current Process id to pass back to debugger
python类_start_new_thread()的实例源码
def intercept_threads(for_attach = False):
thread.start_new_thread = thread_creator
thread.start_new = thread_creator
# If threading has already been imported (i.e. we're attaching), we must hot-patch threading._start_new_thread
# so that new threads started using it will be intercepted by our code.
#
# On the other hand, if threading has not been imported, we must not import it ourselves, because it will then
# treat the current thread as the main thread, which is incorrect when attaching because this code is executing
# on an ephemeral debugger attach thread that will go away shortly. We don't need to hot-patch it in that case
# anyway, because it will pick up the new thread.start_new_thread that we have set above when it's imported.
global _threading
if _threading is None and 'threading' in sys.modules:
import threading
_threading = threading
_threading._start_new_thread = thread_creator
global _INTERCEPTING_FOR_ATTACH
_INTERCEPTING_FOR_ATTACH = for_attach
## Modified parameters by Don Jayamanne
# Accept current Process id to pass back to debugger
def deferToThread(self, callback, func, *args, **kw):
"""
Defer a function to a thread and callback the return value.
@type callback: function
@param callback: function to call on completion
@type cbargs: tuple or list
@param cbargs: arguments to get supplied to the callback
@type func: function
@param func: function to call
"""
def f(func, callback, *args, **kw):
ret = func(*args, **kw)
self.setTimeout(0, callback, ret)
threading._start_new_thread(f, (func, callback) + args, kw)
####
# Scheduling
####
def deferToThread(self, callback, func, *args, **kw):
"""
Defer a function to a thread and callback the return value.
@type callback: function
@param callback: function to call on completion
@type cbargs: tuple or list
@param cbargs: arguments to get supplied to the callback
@type func: function
@param func: function to call
"""
def f(func, callback, *args, **kw):
ret = func(*args, **kw)
self.setTimeout(0, callback, ret)
threading._start_new_thread(f, (func, callback) + args, kw)
####
# Scheduling
####
def intercept_threads(for_attach = False):
thread.start_new_thread = thread_creator
thread.start_new = thread_creator
# If threading has already been imported (i.e. we're attaching), we must hot-patch threading._start_new_thread
# so that new threads started using it will be intercepted by our code.
#
# On the other hand, if threading has not been imported, we must not import it ourselves, because it will then
# treat the current thread as the main thread, which is incorrect when attaching because this code is executing
# on an ephemeral debugger attach thread that will go away shortly. We don't need to hot-patch it in that case
# anyway, because it will pick up the new thread.start_new_thread that we have set above when it's imported.
global _threading
if _threading is None and 'threading' in sys.modules:
import threading
_threading = threading
_threading._start_new_thread = thread_creator
global _INTERCEPTING_FOR_ATTACH
_INTERCEPTING_FOR_ATTACH = for_attach
def intercept_threads(for_attach = False):
thread.start_new_thread = thread_creator
thread.start_new = thread_creator
# If threading has already been imported (i.e. we're attaching), we must hot-patch threading._start_new_thread
# so that new threads started using it will be intercepted by our code.
#
# On the other hand, if threading has not been imported, we must not import it ourselves, because it will then
# treat the current thread as the main thread, which is incorrect when attaching because this code is executing
# on an ephemeral debugger attach thread that will go away shortly. We don't need to hot-patch it in that case
# anyway, because it will pick up the new thread.start_new_thread that we have set above when it's imported.
global _threading
if _threading is None and 'threading' in sys.modules:
import threading
_threading = threading
_threading._start_new_thread = thread_creator
global _INTERCEPTING_FOR_ATTACH
_INTERCEPTING_FOR_ATTACH = for_attach
def intercept_threads(for_attach = False):
thread.start_new_thread = thread_creator
thread.start_new = thread_creator
# If threading has already been imported (i.e. we're attaching), we must hot-patch threading._start_new_thread
# so that new threads started using it will be intercepted by our code.
#
# On the other hand, if threading has not been imported, we must not import it ourselves, because it will then
# treat the current thread as the main thread, which is incorrect when attaching because this code is executing
# on an ephemeral debugger attach thread that will go away shortly. We don't need to hot-patch it in that case
# anyway, because it will pick up the new thread.start_new_thread that we have set above when it's imported.
global _threading
if _threading is None and 'threading' in sys.modules:
import threading
_threading = threading
_threading._start_new_thread = thread_creator
global _INTERCEPTING_FOR_ATTACH
_INTERCEPTING_FOR_ATTACH = for_attach
def intercept_threads(for_attach = False):
thread.start_new_thread = thread_creator
thread.start_new = thread_creator
# If threading has already been imported (i.e. we're attaching), we must hot-patch threading._start_new_thread
# so that new threads started using it will be intercepted by our code.
#
# On the other hand, if threading has not been imported, we must not import it ourselves, because it will then
# treat the current thread as the main thread, which is incorrect when attaching because this code is executing
# on an ephemeral debugger attach thread that will go away shortly. We don't need to hot-patch it in that case
# anyway, because it will pick up the new thread.start_new_thread that we have set above when it's imported.
global _threading
if _threading is None and 'threading' in sys.modules:
import threading
_threading = threading
_threading._start_new_thread = thread_creator
global _INTERCEPTING_FOR_ATTACH
_INTERCEPTING_FOR_ATTACH = for_attach
def intercept_threads(for_attach = False):
thread.start_new_thread = thread_creator
thread.start_new = thread_creator
# If threading has already been imported (i.e. we're attaching), we must hot-patch threading._start_new_thread
# so that new threads started using it will be intercepted by our code.
#
# On the other hand, if threading has not been imported, we must not import it ourselves, because it will then
# treat the current thread as the main thread, which is incorrect when attaching because this code is executing
# on an ephemeral debugger attach thread that will go away shortly. We don't need to hot-patch it in that case
# anyway, because it will pick up the new thread.start_new_thread that we have set above when it's imported.
global _threading
if _threading is None and 'threading' in sys.modules:
import threading
_threading = threading
_threading._start_new_thread = thread_creator
global _INTERCEPTING_FOR_ATTACH
_INTERCEPTING_FOR_ATTACH = for_attach
def thread_creator(func, args, kwargs = {}, *extra_args):
if not isinstance(args, tuple):
# args is not a tuple. This may be because we have become bound to a
# class, which has offset our arguments by one.
if isinstance(kwargs, tuple):
func, args = args, kwargs
kwargs = extra_args[0] if len(extra_args) > 0 else {}
return _start_new_thread(new_thread_wrapper, (func, args, kwargs))
def command_connect_repl(self):
port_num = read_int(self.conn)
_start_new_thread(self.connect_to_repl_backend, (port_num,))
def detach_process():
global DETACHED
DETACHED = True
if not _INTERCEPTING_FOR_ATTACH:
if isinstance(sys.stdout, _DebuggerOutput):
sys.stdout = sys.stdout.old_out
if isinstance(sys.stderr, _DebuggerOutput):
sys.stderr = sys.stderr.old_out
if not _INTERCEPTING_FOR_ATTACH:
thread.start_new_thread = _start_new_thread
thread.start_new = _start_new_thread
def connect_repl_using_socket(sock):
_start_new_thread(DebuggerLoop.instance.connect_to_repl_backend_using_socket, (sock,))
def command_connect_repl(self):
port_num = read_int(self.conn)
_start_new_thread(self.connect_to_repl_backend, (port_num,))
def detach_process():
global DETACHED
DETACHED = True
if not _INTERCEPTING_FOR_ATTACH:
if isinstance(sys.stdout, _DebuggerOutput):
sys.stdout = sys.stdout.old_out
if isinstance(sys.stderr, _DebuggerOutput):
sys.stderr = sys.stderr.old_out
if not _INTERCEPTING_FOR_ATTACH:
thread.start_new_thread = _start_new_thread
thread.start_new = _start_new_thread
def connect_repl_using_socket(sock):
_start_new_thread(DebuggerLoop.instance.connect_to_repl_backend_using_socket, (sock,))
def test_limbo_cleanup(self):
# Issue 7481: Failure to start thread should cleanup the limbo map.
def fail_new_thread(*args):
raise threading.ThreadError()
_start_new_thread = threading._start_new_thread
threading._start_new_thread = fail_new_thread
try:
t = threading.Thread(target=lambda: None)
self.assertRaises(threading.ThreadError, t.start)
self.assertFalse(
t in threading._limbo,
"Failed to cleanup _limbo map on failure of Thread.start().")
finally:
threading._start_new_thread = _start_new_thread
def __init__(self):
import socket
self.buffer = [] # cmd buffer
self.socket = sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP
host='localhost'; port = 9420
sock.connect((host, port))
print('SERVER: socket connected', sock)
self._handle = None
self.setup_callback( bpy.context )
import threading
self.ready = threading._allocate_lock()
self.ID = threading._start_new_thread(
self.loop, (None,)
)
print( 'SERVER: thread started')
def test_limbo_cleanup(self):
# Issue 7481: Failure to start thread should cleanup the limbo map.
def fail_new_thread(*args):
raise thread.error()
_start_new_thread = threading._start_new_thread
threading._start_new_thread = fail_new_thread
try:
t = threading.Thread(target=lambda: None)
self.assertRaises(thread.error, t.start)
self.assertFalse(
t in threading._limbo,
"Failed to cleanup _limbo map on failure of Thread.start().")
finally:
threading._start_new_thread = _start_new_thread
def test_limbo_cleanup(self):
# Issue 7481: Failure to start thread should cleanup the limbo map.
def fail_new_thread(*args):
raise thread.error()
_start_new_thread = threading._start_new_thread
threading._start_new_thread = fail_new_thread
try:
t = threading.Thread(target=lambda: None)
self.assertRaises(thread.error, t.start)
self.assertFalse(
t in threading._limbo,
"Failed to cleanup _limbo map on failure of Thread.start().")
finally:
threading._start_new_thread = _start_new_thread
def parent():
i=0
while True:
i+=1
threading._start_new_thread(child,(i,))
print('parent.....')
if input()=='q':break
def parent():
# ??????
threading._start_new_thread(action, (3,))
#j???????
threading._start_new_thread((lambda: action(2)), ())
obj = Power(1)
threading._start_new_thread(obj.action, ())
def test_message_is_not_thread_safe(self):
class SpoofMessage:
class Channel:
def __init__(self):
self.name = None
def __init__(self):
self.channel = SpoofMessage.Channel()
self.payload = None
@MyJsonRpcWebsocketConsumerTest.rpc_method()
def ping2(**kwargs):
original_message = kwargs["original_message"]
return original_message.payload
@MyJsonRpcWebsocketConsumerTest.rpc_method()
def ping3(**kwargs):
original_message = kwargs["original_message"]
return original_message.payload
def thread_test():
for _i in range(0, 10000):
_message = SpoofMessage()
_message.channel.name = "websocket.test"
_message.payload = "test%s" % _i
_res = MyJsonRpcWebsocketConsumerTest._JsonRpcConsumer__process(
{"id": 1, "jsonrpc": "2.0", "method": "ping3", "params": []}, _message)
self.assertEqual(_res['result'], "test%s" % _i)
import threading
threading._start_new_thread(thread_test, ())
for i in range(0, 10000):
_message = SpoofMessage()
_message.channel.name = "websocket.test"
_message.payload = "test%s" % i
res = MyJsonRpcWebsocketConsumerTest._JsonRpcConsumer__process(
{"id": 1, "jsonrpc": "2.0", "method": "ping2", "params": []}, _message)
self.assertEqual(res['result'], "test%s" % i)
def test_limbo_cleanup(self):
# Issue 7481: Failure to start thread should cleanup the limbo map.
def fail_new_thread(*args):
raise threading.ThreadError()
_start_new_thread = threading._start_new_thread
threading._start_new_thread = fail_new_thread
try:
t = threading.Thread(target=lambda: None)
self.assertRaises(threading.ThreadError, t.start)
self.assertFalse(
t in threading._limbo,
"Failed to cleanup _limbo map on failure of Thread.start().")
finally:
threading._start_new_thread = _start_new_thread
def test_limbo_cleanup(self):
# Issue 7481: Failure to start thread should cleanup the limbo map.
def fail_new_thread(*args):
raise thread.error()
_start_new_thread = threading._start_new_thread
threading._start_new_thread = fail_new_thread
try:
t = threading.Thread(target=lambda: None)
self.assertRaises(thread.error, t.start)
self.assertFalse(
t in threading._limbo,
"Failed to cleanup _limbo map on failure of Thread.start().")
finally:
threading._start_new_thread = _start_new_thread
def thread_creator(func, args, kwargs = {}, *extra_args):
if not isinstance(args, tuple):
# args is not a tuple. This may be because we have become bound to a
# class, which has offset our arguments by one.
if isinstance(kwargs, tuple):
func, args = args, kwargs
kwargs = extra_args[0] if len(extra_args) > 0 else {}
return _start_new_thread(new_thread_wrapper, (func, args, kwargs))
def command_connect_repl(self):
port_num = read_int(self.conn)
_start_new_thread(self.connect_to_repl_backend, (port_num,))
def detach_process():
global DETACHED
DETACHED = True
if not _INTERCEPTING_FOR_ATTACH:
if isinstance(sys.stdout, _DebuggerOutput):
sys.stdout = sys.stdout.old_out
if isinstance(sys.stderr, _DebuggerOutput):
sys.stderr = sys.stderr.old_out
if not _INTERCEPTING_FOR_ATTACH:
thread.start_new_thread = _start_new_thread
thread.start_new = _start_new_thread
def connect_repl_using_socket(sock):
_start_new_thread(DebuggerLoop.instance.connect_to_repl_backend_using_socket, (sock,))
def test_limbo_cleanup(self):
# Issue 7481: Failure to start thread should cleanup the limbo map.
def fail_new_thread(*args):
raise threading.ThreadError()
_start_new_thread = threading._start_new_thread
threading._start_new_thread = fail_new_thread
try:
t = threading.Thread(target=lambda: None)
self.assertRaises(threading.ThreadError, t.start)
self.assertFalse(
t in threading._limbo,
"Failed to cleanup _limbo map on failure of Thread.start().")
finally:
threading._start_new_thread = _start_new_thread