def test_single_connection(self):
"""
Test a single connection with sequential requests.
"""
conn = self.get_connection()
query = "SELECT keyspace_name FROM system.schema_keyspaces LIMIT 1"
event = Event()
def cb(count, *args, **kwargs):
count += 1
if count >= 10:
conn.close()
event.set()
else:
conn.send_msg(
QueryMessage(query=query, consistency_level=ConsistencyLevel.ONE),
request_id=0,
cb=partial(cb, count))
conn.send_msg(
QueryMessage(query=query, consistency_level=ConsistencyLevel.ONE),
request_id=0,
cb=partial(cb, 0))
event.wait()
test_connection.py 文件源码
python
阅读 26
收藏 0
点赞 0
评论 0
评论列表
文章目录