def test_handle_schema_change(self):
change_types = [getattr(SchemaChangeType, attr) for attr in vars(SchemaChangeType) if attr[0] != '_']
for change_type in change_types:
event = {
'target_type': SchemaTargetType.TABLE,
'change_type': change_type,
'keyspace': 'ks1',
'table': 'table1'
}
self.cluster.scheduler.reset_mock()
self.control_connection._handle_schema_change(event)
self.cluster.scheduler.schedule_unique.assert_called_once_with(ANY, self.control_connection.refresh_schema, **event)
self.cluster.scheduler.reset_mock()
event['target_type'] = SchemaTargetType.KEYSPACE
del event['table']
self.control_connection._handle_schema_change(event)
self.cluster.scheduler.schedule_unique.assert_called_once_with(ANY, self.control_connection.refresh_schema, **event)
test_control_connection.py 文件源码
python
阅读 28
收藏 0
点赞 0
评论 0
评论列表
文章目录