def _Dynamic_GetSocketOptions(self, request, response):
state = self._LookupSocket(request.socket_descriptor())
for opt in request.options_list():
if (opt.level() ==
remote_socket_service_pb.SocketOption.SOCKET_SOL_SOCKET and
opt.option() ==
remote_socket_service_pb.SocketOption.SOCKET_SO_ERROR):
ret = response.add_options()
ret.set_level(opt.level())
ret.set_option(opt.option())
ret.set_value(
state.sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR, 1024))
else:
value = self._mock_options.GetMockValue(opt.level(), opt.option())
if value is None:
raise apiproxy_errors.ApplicationError(
RemoteSocketServiceError.PERMISSION_DENIED,
'Attempt to get blocked socket option.')
ret = response.add_options()
ret.set_level(opt.level())
ret.set_option(opt.option())
ret.set_value(value)
_remote_socket_stub.py 文件源码
python
阅读 39
收藏 0
点赞 0
评论 0
评论列表
文章目录