def __init__(self, graph=None):
"""
Construct the graph object.
Parameters
----------
graph : networkx.DiGraph or NNGraph
Graph to build the object from (optional).
"""
super(Graph, self).__init__()
# Privates
self._thread_to_graph_mapping = {}
self._creator_thread = threading.get_ident()
self._creator_pid = mp.current_process().pid
# Publics
if graph is not None:
self.graph = graph
else:
self.graph = NNGraph()
python类get_ident()的实例源码
def _recursive_repr(fillvalue='...'):
'Decorator to make a repr function return fillvalue for a recursive call'
def decorating_function(user_function):
repr_running = set()
def wrapper(self):
key = id(self), get_ident()
if key in repr_running:
return fillvalue
repr_running.add(key)
try:
result = user_function(self)
finally:
repr_running.discard(key)
return result
# Can't use functools.wraps() here because of bootstrap issues
wrapper.__module__ = getattr(user_function, '__module__')
wrapper.__doc__ = getattr(user_function, '__doc__')
wrapper.__name__ = getattr(user_function, '__name__')
wrapper.__annotations__ = getattr(user_function, '__annotations__', {})
return wrapper
return decorating_function
def async_test_scarlett_os(loop):
"""Return a ScarlettOS object pointing at test config dir."""
# loop._thread_ident = threading.get_ident()
ss = s.ScarlettSystem(loop)
ss.config.location_name = 'test scarlett'
ss.config.config_dir = get_test_config_dir()
ss.config.latitude = 32.87336
ss.config.longitude = -117.22743
ss.config.elevation = 0
ss.config.time_zone = date_utility.get_time_zone('US/Pacific')
ss.config.units = METRIC_SYSTEM
ss.config.skip_pip = True
# if 'custom_automations.test' not in loader.AVAILABLE_COMPONENTS:
# yield from loop.run_in_executor(None, loader.prepare, ss)
ss.state = s.CoreState.running
return ss
def pre_work(
self,
task,
):
self.update_current_task(
task=task,
)
interval = self.worker_config['timeouts']['soft_timeout']
if interval == 0:
interval = None
self.current_timers[threading.get_ident()] = threading.Timer(
interval=interval,
function=ctypes.pythonapi.PyThreadState_SetAsyncExc,
args=(
ctypes.c_long(threading.get_ident()),
ctypes.py_object(worker.WorkerSoftTimedout),
)
)
self.current_timers[threading.get_ident()].start()
def __repr__(self, _repr_running={}):
'repr as "MIDict(items, names)"'
call_key = id(self), _get_ident()
if call_key in _repr_running: # pragma: no cover
return '<%s(...)>' % self.__class__.__name__
_repr_running[call_key] = 1
try:
try:
if self.indices:
names = force_list(self.indices.keys())
items = force_list(self.items())
return '%s(%s, %s)' % (self.__class__.__name__, items, names)
except AttributeError: # pragma: no cover
# may not have attr ``indices`` yet
pass
return '%s()' % self.__class__.__name__
finally:
del _repr_running[call_key]
def reject_recursive_repeats(to_wrap):
'''
Prevent simple cycles by returning None when called recursively with same instance
'''
to_wrap.__already_called = {}
@functools.wraps(to_wrap)
def wrapped(*args):
arg_instances = tuple(map(id, args))
thread_id = threading.get_ident()
thread_local_args = (thread_id,) + arg_instances
if thread_local_args in to_wrap.__already_called:
raise ValueError('Recursively called %s with %r' % (to_wrap, args))
to_wrap.__already_called[thread_local_args] = True
try:
wrapped_val = to_wrap(*args)
finally:
del to_wrap.__already_called[thread_local_args]
return wrapped_val
return wrapped
def test_pthread_kill_main_thread(self):
# Test that a signal can be sent to the main thread with pthread_kill()
# before any other thread has been created (see issue #12392).
code = """if True:
import threading
import signal
import sys
def handler(signum, frame):
sys.exit(3)
signal.signal(signal.SIGUSR1, handler)
signal.pthread_kill(threading.get_ident(), signal.SIGUSR1)
sys.exit(2)
"""
with spawn_python('-c', code) as process:
stdout, stderr = process.communicate()
exitcode = process.wait()
if exitcode != 3:
raise Exception("Child error (exit code %s): %s" %
(exitcode, stdout))
def __init__(self, f, n, wait_before_exit=False):
"""
Construct a bunch of `n` threads running the same function `f`.
If `wait_before_exit` is True, the threads won't terminate until
do_finish() is called.
"""
self.f = f
self.n = n
self.started = []
self.finished = []
self._can_exit = not wait_before_exit
def task():
tid = threading.get_ident()
self.started.append(tid)
try:
f()
finally:
self.finished.append(tid)
while not self._can_exit:
_wait()
for i in range(n):
start_new_thread(task, ())
def task(N, done, done_tasks, errors):
try:
# We don't use modulefinder but still import it in order to stress
# importing of different modules from several threads.
if len(done_tasks) % 2:
import modulefinder
import random
else:
import random
import modulefinder
# This will fail if random is not completely initialized
x = random.randrange(1, 3)
except Exception as e:
errors.append(e.with_traceback(None))
finally:
done_tasks.append(threading.get_ident())
finished = len(done_tasks) == N
if finished:
done.set()
# Create a circular import structure: A -> C -> B -> D -> A
# NOTE: `time` is already loaded and therefore doesn't threaten to deadlock.
def test_thread_state(self):
# some extra thread-state tests driven via _testcapi
def target():
idents = []
def callback():
idents.append(threading.get_ident())
_testcapi._test_thread_state(callback)
a = b = callback
time.sleep(1)
# Check our main thread is in the list exactly 3 times.
self.assertEqual(idents.count(threading.get_ident()), 3,
"Couldn't find main thread correctly in the list")
target()
t = threading.Thread(target=target)
t.start()
t.join()
def _check_thread(self):
"""Check that the current thread is the thread running the event loop.
Non-thread-safe methods of this class make this assumption and will
likely behave incorrectly when the assumption is violated.
Should only be called when (self._debug == True). The caller is
responsible for checking this condition for performance reasons.
"""
if self._thread_id is None:
return
thread_id = threading.get_ident()
if thread_id != self._thread_id:
raise RuntimeError(
"Non-thread-safe operation invoked on an event loop other "
"than the current one")
def _green_existing_locks():
"""Make locks created before monkey-patching safe.
RLocks rely on a Lock and on Python 2, if an unpatched Lock blocks, it
blocks the native thread. We need to replace these with green Locks.
This was originally noticed in the stdlib logging module."""
import gc
import threading
import eventlet.green.thread
lock_type = type(threading.Lock())
rlock_type = type(threading.RLock())
if sys.version_info[0] >= 3:
pyrlock_type = type(threading._PyRLock())
# We're monkey-patching so there can't be any greenlets yet, ergo our thread
# ID is the only valid owner possible.
tid = eventlet.green.thread.get_ident()
for obj in gc.get_objects():
if isinstance(obj, rlock_type):
if (sys.version_info[0] == 2 and
isinstance(obj._RLock__block, lock_type)):
_fix_py2_rlock(obj, tid)
elif (sys.version_info[0] >= 3 and
not isinstance(obj, pyrlock_type)):
_fix_py3_rlock(obj)
def timeout_response() -> chalice.Response:
"""
Produce a chalice Response object that indicates a timeout. Stacktraces for all running threads, other than the
current thread, are provided in the response object.
"""
frames = sys._current_frames()
current_threadid = threading.get_ident()
trace_dump = {
thread_id: traceback.format_stack(frame)
for thread_id, frame in frames.items()
if thread_id != current_threadid}
problem = {
'status': requests.codes.gateway_timeout,
'code': "timed_out",
'title': "Timed out processing request.",
'traces': trace_dump,
}
return chalice.Response(
status_code=problem['status'],
headers={"Content-Type": "application/problem+json"},
body=json.dumps(problem),
)
def test_with_multi_threading():
test_truncate()
def task(n):
print('In thread {}'.format(threading.get_ident()))
for _ in range(n):
test_insert_one()
threads = [threading.Thread(target=task, args=(100,)) for _ in range(50)]
for t in threads:
t.start()
for t in threads:
t.join()
test_query()
def run_forever(self):
"""Run until stop() is called."""
self._check_closed()
if self.is_running():
raise RuntimeError('Event loop is running.')
self._set_coroutine_wrapper(self._debug)
self._thread_id = threading.get_ident()
try:
while True:
self._run_once()
if self._stopping:
break
finally:
self._stopping = False
self._thread_id = None
self._set_coroutine_wrapper(False)
def _check_thread(self):
"""Check that the current thread is the thread running the event loop.
Non-thread-safe methods of this class make this assumption and will
likely behave incorrectly when the assumption is violated.
Should only be called when (self._debug == True). The caller is
responsible for checking this condition for performance reasons.
"""
if self._thread_id is None:
return
thread_id = threading.get_ident()
if thread_id != self._thread_id:
raise RuntimeError(
"Non-thread-safe operation invoked on an event loop other "
"than the current one")
def test_pthread_kill_main_thread(self):
# Test that a signal can be sent to the main thread with pthread_kill()
# before any other thread has been created (see issue #12392).
code = """if True:
import threading
import signal
import sys
def handler(signum, frame):
sys.exit(3)
signal.signal(signal.SIGUSR1, handler)
signal.pthread_kill(threading.get_ident(), signal.SIGUSR1)
sys.exit(2)
"""
with spawn_python('-c', code) as process:
stdout, stderr = process.communicate()
exitcode = process.wait()
if exitcode != 3:
raise Exception("Child error (exit code %s): %s" %
(exitcode, stdout))
def test_thread_state(self):
# some extra thread-state tests driven via _testcapi
def target():
idents = []
def callback():
idents.append(threading.get_ident())
_testcapi._test_thread_state(callback)
a = b = callback
time.sleep(1)
# Check our main thread is in the list exactly 3 times.
self.assertEqual(idents.count(threading.get_ident()), 3,
"Couldn't find main thread correctly in the list")
target()
t = threading.Thread(target=target)
t.start()
t.join()
def test_main_thread_after_fork(self):
code = """if 1:
import os, threading
pid = os.fork()
if pid == 0:
main = threading.main_thread()
print(main.name)
print(main.ident == threading.current_thread().ident)
print(main.ident == threading.get_ident())
else:
os.waitpid(pid, 0)
"""
_, out, err = assert_python_ok("-c", code)
data = out.decode().replace('\r', '')
self.assertEqual(err, b"")
self.assertEqual(data, "MainThread\nTrue\nTrue\n")
def test_main_thread_after_fork_from_nonmain_thread(self):
code = """if 1:
import os, threading, sys
def f():
pid = os.fork()
if pid == 0:
main = threading.main_thread()
print(main.name)
print(main.ident == threading.current_thread().ident)
print(main.ident == threading.get_ident())
# stdout is fully buffered because not a tty,
# we have to flush before exit.
sys.stdout.flush()
else:
os.waitpid(pid, 0)
th = threading.Thread(target=f)
th.start()
th.join()
"""
_, out, err = assert_python_ok("-c", code)
data = out.decode().replace('\r', '')
self.assertEqual(err, b"")
self.assertEqual(data, "Thread-1\nTrue\nTrue\n")
def __init__(self, uri):
if isinstance(uri, str):
uri = core.URI(uri)
elif not isinstance(uri, core.URI):
raise TypeError("expected Pyro URI")
self._pyroUri = uri
self._pyroConnection = None
self._pyroSerializer = None # can be set to the name of a serializer to override the global one per-proxy
self._pyroMethods = set() # all methods of the remote object, gotten from meta-data
self._pyroAttrs = set() # attributes of the remote object, gotten from meta-data
self._pyroOneway = set() # oneway-methods of the remote object, gotten from meta-data
self._pyroSeq = 0 # message sequence number
self._pyroRawWireResponse = False # internal switch to enable wire level responses
self._pyroHandshake = "hello" # the data object that should be sent in the initial connection handshake message
self._pyroMaxRetries = config.MAX_RETRIES
self.__pyroTimeout = config.COMMTIMEOUT
self.__pyroOwnerThread = get_ident() # the thread that owns this proxy
if config.SERIALIZER not in serializers.serializers:
raise ValueError("unknown serializer configured")
core.current_context.annotations = {}
core.current_context.response_annotations = {}
def schedule(self, f, *args, **kwargs):
"""
Try to acquire connection access lock. Then call protocol method.
Return concurrent Future instance you can wait in the other
thread.
"""
self.wait_open()
# RabbitMQ operations are multiplexed between different AMQP
# method callbacks. Final result of the protocol method call
# will be set inside one of this callbacks. So other thread
# will be able to wait unless this event happens in the
# connection event loop.
future = Future()
with self.lock:
self.process(get_ident(), (f, args, kwargs), future)
return future
def format_html(fp, exclude=()):
frames = stackframes()
fp.write('<!DOCTYPE html>\n')
fp.write('<html><head><title>{} Traces</title></head><body>\n'.format(len(frames)))
for thread_id, stack in sorted(frames.items(), key=lambda x: x[0]):
name = 'Thread {}'.format(thread_id)
if thread_id == threading.get_ident():
name += ' (tracing thread)'
elif thread_id == main_thread.ident:
name += ' (main)'
fp.write('<h3>{}</h3>\n'.format(name))
tbstr = format_stack(stack)
if pygments:
formatter = pygments.formatters.HtmlFormatter(full=False, noclasses=True)
lexer = pygments.lexers.PythonLexer()
tbstr = pygments.highlight(tbstr, lexer, formatter)
fp.write(tbstr)
fp.write('\n')
fp.write('</body>\n')
def test_async():
from threading import get_ident
main = get_ident()
@on_test.register(async=True)
def a1():
assert get_ident() != main
@on_test.register()
def a2():
assert get_ident() == main
on_test()
on_test.async = True
@on_test.register() # follows the current setting
def a3():
assert get_ident() != main
on_test()
on_test.async = False
def __init__(self, pool_size):
assert not self.__class__.POOL, "Can't create more than one: %s" % self.__class__
self.__class__.POOL = self
self.active_jobs = set()
self.pool_size = pool_size
self.jobs_queue = Queue()
def work():
while True:
func, name, job_id, parent_uuid = self.jobs_queue.get()
_set_thread_uuid(threading.get_ident(), parent_uuid)
_logger.debug('Starting job in real thread: %s', name or "<anonymous>")
func()
self.active_jobs.remove(job_id)
_logger.debug('ready for the next job')
for i in range(self.pool_size):
name = 'real-thread-%s' % i
thread = threading.Thread(target=work, name=name, daemon=True)
thread.start()
def test_pthread_kill_main_thread(self):
# Test that a signal can be sent to the main thread with pthread_kill()
# before any other thread has been created (see issue #12392).
code = """if True:
import threading
import signal
import sys
def handler(signum, frame):
sys.exit(3)
signal.signal(signal.SIGUSR1, handler)
signal.pthread_kill(threading.get_ident(), signal.SIGUSR1)
sys.exit(2)
"""
with spawn_python('-c', code) as process:
stdout, stderr = process.communicate()
exitcode = process.wait()
if exitcode != 3:
raise Exception("Child error (exit code %s): %s" %
(exitcode, stdout))
def __init__(self, f, n, wait_before_exit=False):
"""
Construct a bunch of `n` threads running the same function `f`.
If `wait_before_exit` is True, the threads won't terminate until
do_finish() is called.
"""
self.f = f
self.n = n
self.started = []
self.finished = []
self._can_exit = not wait_before_exit
def task():
tid = threading.get_ident()
self.started.append(tid)
try:
f()
finally:
self.finished.append(tid)
while not self._can_exit:
_wait()
for i in range(n):
start_new_thread(task, ())
def task(N, done, done_tasks, errors):
try:
# We don't use modulefinder but still import it in order to stress
# importing of different modules from several threads.
if len(done_tasks) % 2:
import modulefinder
import random
else:
import random
import modulefinder
# This will fail if random is not completely initialized
x = random.randrange(1, 3)
except Exception as e:
errors.append(e.with_traceback(None))
finally:
done_tasks.append(threading.get_ident())
finished = len(done_tasks) == N
if finished:
done.set()
# Create a circular import structure: A -> C -> B -> D -> A
# NOTE: `time` is already loaded and therefore doesn't threaten to deadlock.
def test_thread_state(self):
# some extra thread-state tests driven via _testcapi
def target():
idents = []
def callback():
idents.append(threading.get_ident())
_testcapi._test_thread_state(callback)
a = b = callback
time.sleep(1)
# Check our main thread is in the list exactly 3 times.
self.assertEqual(idents.count(threading.get_ident()), 3,
"Couldn't find main thread correctly in the list")
target()
t = threading.Thread(target=target)
t.start()
t.join()
def test_main_thread_after_fork(self):
code = """if 1:
import os, threading
pid = os.fork()
if pid == 0:
main = threading.main_thread()
print(main.name)
print(main.ident == threading.current_thread().ident)
print(main.ident == threading.get_ident())
else:
os.waitpid(pid, 0)
"""
_, out, err = assert_python_ok("-c", code)
data = out.decode().replace('\r', '')
self.assertEqual(err, b"")
self.assertEqual(data, "MainThread\nTrue\nTrue\n")