def test_handle_layer_error(self):
context = LayerContext(
mode="socks", src_stream=self.src_stream, port=443, scheme="h2")
layer_manager._handle_layer_error(gen.TimeoutError("timeout"), context)
context.src_stream.close.assert_called_once_with()
context.src_stream.reset_mock()
layer_manager._handle_layer_error(DestNotConnectedError("stream closed"), context)
context.src_stream.close.assert_not_called()
context.src_stream.reset_mock()
layer_manager._handle_layer_error(DestStreamClosedError("stream closed"), context)
context.src_stream.close.assert_called_once_with()
context.src_stream.reset_mock()
layer_manager._handle_layer_error(SrcStreamClosedError("stream closed"), context)
context.src_stream.close.assert_not_called()
context.src_stream.reset_mock()
layer_manager._handle_layer_error(iostream.StreamClosedError("stream closed"), context)
context.src_stream.close.assert_called_once_with()
评论列表
文章目录