def test_no_raise_on_first_failure(self):
statement = SimpleStatement(
"INSERT INTO test3rf.test (k, v) VALUES (%s, %s)",
consistency_level=ConsistencyLevel.QUORUM)
statements = cycle((statement, ))
parameters = [(i, i) for i in range(100)]
# we'll get an error back from the server
parameters[57] = ('efefef', 'awefawefawef')
results = execute_concurrent(self.session, list(zip(statements, parameters)), raise_on_first_error=False)
for i, (success, result) in enumerate(results):
if i == 57:
self.assertFalse(success)
self.assertIsInstance(result, InvalidRequest)
else:
self.assertTrue(success)
self.assertFalse(result)
test_concurrent.py 文件源码
python
阅读 26
收藏 0
点赞 0
评论 0
评论列表
文章目录