def test_no_raise_on_first_failure_client_side(self):
statement = SimpleStatement(
"INSERT INTO test3rf.test (k, v) VALUES (%s, %s)",
consistency_level=ConsistencyLevel.QUORUM)
statements = cycle((statement, ))
parameters = [(i, i) for i in range(100)]
# the driver will raise an error when binding the params
parameters[57] = 1
results = execute_concurrent(self.session, list(zip(statements, parameters)), raise_on_first_error=False)
for i, (success, result) in enumerate(results):
if i == 57:
self.assertFalse(success)
self.assertIsInstance(result, TypeError)
else:
self.assertTrue(success)
self.assertFalse(result)
test_concurrent.py 文件源码
python
阅读 26
收藏 0
点赞 0
评论 0
评论列表
文章目录