def test_init_once_multithread():
if sys.version_info < (3,):
import thread
else:
import _thread as thread
import time
#
def do_init():
print('init!')
seen.append('init!')
time.sleep(1)
seen.append('init done')
print('init done')
return 7
ffi = _cffi1_backend.FFI()
seen = []
for i in range(6):
def f():
res = ffi.init_once(do_init, "tag")
seen.append(res)
thread.start_new_thread(f, ())
time.sleep(1.5)
assert seen == ['init!', 'init done'] + 6 * [7]
评论列表
文章目录