def test_thread_safety_during_modification(self):
hosts = range(100)
policy = RoundRobinPolicy()
policy.populate(None, hosts)
errors = []
def check_query_plan():
try:
for i in xrange(100):
list(policy.make_query_plan())
except Exception as exc:
errors.append(exc)
def host_up():
for i in xrange(1000):
policy.on_up(randint(0, 99))
def host_down():
for i in xrange(1000):
policy.on_down(randint(0, 99))
threads = []
for i in range(5):
threads.append(Thread(target=check_query_plan))
threads.append(Thread(target=host_up))
threads.append(Thread(target=host_down))
# make the GIL switch after every instruction, maximizing
# the chance of race conditions
check = six.PY2 or '__pypy__' in sys.builtin_module_names
if check:
original_interval = sys.getcheckinterval()
else:
original_interval = sys.getswitchinterval()
try:
if check:
sys.setcheckinterval(0)
else:
sys.setswitchinterval(0.0001)
for t in threads:
t.start()
for t in threads:
t.join()
finally:
if check:
sys.setcheckinterval(original_interval)
else:
sys.setswitchinterval(original_interval)
if errors:
self.fail("Saw errors: %s" % (errors,))